Statistics
| Branch: | Tag: | Revision:

root / lib / confd / client.py @ 71e114da

History | View | Annotate | Download (16.8 kB)

1 e4ccf6cd Guido Trotter
#
2 e4ccf6cd Guido Trotter
#
3 e4ccf6cd Guido Trotter
4 e4ccf6cd Guido Trotter
# Copyright (C) 2009 Google Inc.
5 e4ccf6cd Guido Trotter
#
6 e4ccf6cd Guido Trotter
# This program is free software; you can redistribute it and/or modify
7 e4ccf6cd Guido Trotter
# it under the terms of the GNU General Public License as published by
8 e4ccf6cd Guido Trotter
# the Free Software Foundation; either version 2 of the License, or
9 e4ccf6cd Guido Trotter
# (at your option) any later version.
10 e4ccf6cd Guido Trotter
#
11 e4ccf6cd Guido Trotter
# This program is distributed in the hope that it will be useful, but
12 e4ccf6cd Guido Trotter
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 e4ccf6cd Guido Trotter
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 e4ccf6cd Guido Trotter
# General Public License for more details.
15 e4ccf6cd Guido Trotter
#
16 e4ccf6cd Guido Trotter
# You should have received a copy of the GNU General Public License
17 e4ccf6cd Guido Trotter
# along with this program; if not, write to the Free Software
18 e4ccf6cd Guido Trotter
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 e4ccf6cd Guido Trotter
# 02110-1301, USA.
20 e4ccf6cd Guido Trotter
21 e4ccf6cd Guido Trotter
22 e4ccf6cd Guido Trotter
"""Ganeti confd client
23 e4ccf6cd Guido Trotter

24 cf7b0cc4 Guido Trotter
Clients can use the confd client library to send requests to a group of master
25 cf7b0cc4 Guido Trotter
candidates running confd. The expected usage is through the asyncore framework,
26 cf7b0cc4 Guido Trotter
by sending queries, and asynchronously receiving replies through a callback.
27 cf7b0cc4 Guido Trotter

28 cf7b0cc4 Guido Trotter
This way the client library doesn't ever need to "wait" on a particular answer,
29 cf7b0cc4 Guido Trotter
and can proceed even if some udp packets are lost. It's up to the user to
30 cf7b0cc4 Guido Trotter
reschedule queries if they haven't received responses and they need them.
31 cf7b0cc4 Guido Trotter

32 69b99987 Michael Hanselmann
Example usage::
33 69b99987 Michael Hanselmann

34 cf7b0cc4 Guido Trotter
  client = ConfdClient(...) # includes callback specification
35 cf7b0cc4 Guido Trotter
  req = confd_client.ConfdClientRequest(type=constants.CONFD_REQ_PING)
36 cf7b0cc4 Guido Trotter
  client.SendRequest(req)
37 cf7b0cc4 Guido Trotter
  # then make sure your client calls asyncore.loop() or daemon.Mainloop.Run()
38 cf7b0cc4 Guido Trotter
  # ... wait ...
39 cf7b0cc4 Guido Trotter
  # And your callback will be called by asyncore, when your query gets a
40 cf7b0cc4 Guido Trotter
  # response, or when it expires.
41 cf7b0cc4 Guido Trotter

42 392ca296 Guido Trotter
You can use the provided ConfdFilterCallback to act as a filter, only passing
43 392ca296 Guido Trotter
"newer" answer to your callback, and filtering out outdated ones, or ones
44 392ca296 Guido Trotter
confirming what you already got.
45 392ca296 Guido Trotter

46 e4ccf6cd Guido Trotter
"""
47 69b99987 Michael Hanselmann
48 6c881c52 Iustin Pop
# pylint: disable-msg=E0203
49 6c881c52 Iustin Pop
50 6c881c52 Iustin Pop
# E0203: Access to member %r before its definition, since we use
51 6c881c52 Iustin Pop
# objects.py which doesn't explicitely initialise its members
52 6c881c52 Iustin Pop
53 e4ccf6cd Guido Trotter
import time
54 e4ccf6cd Guido Trotter
import random
55 e4ccf6cd Guido Trotter
56 e4ccf6cd Guido Trotter
from ganeti import utils
57 e4ccf6cd Guido Trotter
from ganeti import constants
58 e4ccf6cd Guido Trotter
from ganeti import objects
59 e4ccf6cd Guido Trotter
from ganeti import serializer
60 e4ccf6cd Guido Trotter
from ganeti import daemon # contains AsyncUDPSocket
61 e4ccf6cd Guido Trotter
from ganeti import errors
62 e4ccf6cd Guido Trotter
from ganeti import confd
63 5b349fd1 Iustin Pop
from ganeti import ssconf
64 e4ccf6cd Guido Trotter
65 e4ccf6cd Guido Trotter
66 e4ccf6cd Guido Trotter
class ConfdAsyncUDPClient(daemon.AsyncUDPSocket):
67 e4ccf6cd Guido Trotter
  """Confd udp asyncore client
68 e4ccf6cd Guido Trotter

69 e4ccf6cd Guido Trotter
  This is kept separate from the main ConfdClient to make sure it's easy to
70 e4ccf6cd Guido Trotter
  implement a non-asyncore based client library.
71 e4ccf6cd Guido Trotter

72 e4ccf6cd Guido Trotter
  """
73 e4ccf6cd Guido Trotter
  def __init__(self, client):
74 e4ccf6cd Guido Trotter
    """Constructor for ConfdAsyncUDPClient
75 e4ccf6cd Guido Trotter

76 e4ccf6cd Guido Trotter
    @type client: L{ConfdClient}
77 e4ccf6cd Guido Trotter
    @param client: client library, to pass the datagrams to
78 e4ccf6cd Guido Trotter

79 e4ccf6cd Guido Trotter
    """
80 e4ccf6cd Guido Trotter
    daemon.AsyncUDPSocket.__init__(self)
81 e4ccf6cd Guido Trotter
    self.client = client
82 e4ccf6cd Guido Trotter
83 e4ccf6cd Guido Trotter
  # this method is overriding a daemon.AsyncUDPSocket method
84 e4ccf6cd Guido Trotter
  def handle_datagram(self, payload, ip, port):
85 e4ccf6cd Guido Trotter
    self.client.HandleResponse(payload, ip, port)
86 e4ccf6cd Guido Trotter
87 e4ccf6cd Guido Trotter
88 71e114da Iustin Pop
class _Request(object):
89 71e114da Iustin Pop
  """Request status structure.
90 71e114da Iustin Pop

91 71e114da Iustin Pop
  @ivar request: the request data
92 71e114da Iustin Pop
  @ivar args: any extra arguments for the callback
93 71e114da Iustin Pop
  @ivar expiry: the expiry timestamp of the request
94 71e114da Iustin Pop

95 71e114da Iustin Pop
  """
96 71e114da Iustin Pop
  def __init__(self, request, args, expiry):
97 71e114da Iustin Pop
    self.request = request
98 71e114da Iustin Pop
    self.args = args
99 71e114da Iustin Pop
    self.expiry = expiry
100 71e114da Iustin Pop
101 71e114da Iustin Pop
102 e4ccf6cd Guido Trotter
class ConfdClient:
103 e4ccf6cd Guido Trotter
  """Send queries to confd, and get back answers.
104 e4ccf6cd Guido Trotter

105 e4ccf6cd Guido Trotter
  Since the confd model works by querying multiple master candidates, and
106 e4ccf6cd Guido Trotter
  getting back answers, this is an asynchronous library. It can either work
107 e4ccf6cd Guido Trotter
  through asyncore or with your own handling.
108 e4ccf6cd Guido Trotter

109 71e114da Iustin Pop
  @type _requests: dict
110 71e114da Iustin Pop
  @ivar _requests: dictionary indexes by salt, which contains data
111 71e114da Iustin Pop
      about the outstanding requests; the values are objects of type
112 71e114da Iustin Pop
      L{_Request}
113 71e114da Iustin Pop

114 e4ccf6cd Guido Trotter
  """
115 a3db74e4 Guido Trotter
  def __init__(self, hmac_key, peers, callback, port=None, logger=None):
116 e4ccf6cd Guido Trotter
    """Constructor for ConfdClient
117 e4ccf6cd Guido Trotter

118 e4ccf6cd Guido Trotter
    @type hmac_key: string
119 e4ccf6cd Guido Trotter
    @param hmac_key: hmac key to talk to confd
120 e4ccf6cd Guido Trotter
    @type peers: list
121 e4ccf6cd Guido Trotter
    @param peers: list of peer nodes
122 96e03b0b Guido Trotter
    @type callback: f(L{ConfdUpcallPayload})
123 96e03b0b Guido Trotter
    @param callback: function to call when getting answers
124 7d20c647 Guido Trotter
    @type port: integer
125 d63997b3 Guido Trotter
    @param port: confd port (default: use GetDaemonPort)
126 69b99987 Michael Hanselmann
    @type logger: logging.Logger
127 d63997b3 Guido Trotter
    @param logger: optional logger for internal conditions
128 e4ccf6cd Guido Trotter

129 e4ccf6cd Guido Trotter
    """
130 96e03b0b Guido Trotter
    if not callable(callback):
131 96e03b0b Guido Trotter
      raise errors.ProgrammerError("callback must be callable")
132 e4ccf6cd Guido Trotter
133 a5229439 Guido Trotter
    self.UpdatePeerList(peers)
134 e4ccf6cd Guido Trotter
    self._hmac_key = hmac_key
135 e4ccf6cd Guido Trotter
    self._socket = ConfdAsyncUDPClient(self)
136 96e03b0b Guido Trotter
    self._callback = callback
137 7d20c647 Guido Trotter
    self._confd_port = port
138 a3db74e4 Guido Trotter
    self._logger = logger
139 96e03b0b Guido Trotter
    self._requests = {}
140 7d20c647 Guido Trotter
141 7d20c647 Guido Trotter
    if self._confd_port is None:
142 7d20c647 Guido Trotter
      self._confd_port = utils.GetDaemonPort(constants.CONFD)
143 e4ccf6cd Guido Trotter
144 a5229439 Guido Trotter
  def UpdatePeerList(self, peers):
145 a5229439 Guido Trotter
    """Update the list of peers
146 a5229439 Guido Trotter

147 a5229439 Guido Trotter
    @type peers: list
148 a5229439 Guido Trotter
    @param peers: list of peer nodes
149 a5229439 Guido Trotter

150 a5229439 Guido Trotter
    """
151 e11ddf13 Iustin Pop
    # we are actually called from init, so:
152 e11ddf13 Iustin Pop
    # pylint: disable-msg=W0201
153 a5229439 Guido Trotter
    if not isinstance(peers, list):
154 a5229439 Guido Trotter
      raise errors.ProgrammerError("peers must be a list")
155 db169865 Guido Trotter
    # make a copy of peers, since we're going to shuffle the list, later
156 db169865 Guido Trotter
    self._peers = list(peers)
157 a5229439 Guido Trotter
158 e4ccf6cd Guido Trotter
  def _PackRequest(self, request, now=None):
159 e4ccf6cd Guido Trotter
    """Prepare a request to be sent on the wire.
160 e4ccf6cd Guido Trotter

161 e4ccf6cd Guido Trotter
    This function puts a proper salt in a confd request, puts the proper salt,
162 e4ccf6cd Guido Trotter
    and adds the correct magic number.
163 e4ccf6cd Guido Trotter

164 e4ccf6cd Guido Trotter
    """
165 e4ccf6cd Guido Trotter
    if now is None:
166 e4ccf6cd Guido Trotter
      now = time.time()
167 e4ccf6cd Guido Trotter
    tstamp = '%d' % now
168 e4ccf6cd Guido Trotter
    req = serializer.DumpSignedJson(request.ToDict(), self._hmac_key, tstamp)
169 e4ccf6cd Guido Trotter
    return confd.PackMagic(req)
170 e4ccf6cd Guido Trotter
171 e4ccf6cd Guido Trotter
  def _UnpackReply(self, payload):
172 e4ccf6cd Guido Trotter
    in_payload = confd.UnpackMagic(payload)
173 c103d7ae Guido Trotter
    (dict_answer, salt) = serializer.LoadSignedJson(in_payload, self._hmac_key)
174 c103d7ae Guido Trotter
    answer = objects.ConfdReply.FromDict(dict_answer)
175 e4ccf6cd Guido Trotter
    return answer, salt
176 e4ccf6cd Guido Trotter
177 96e03b0b Guido Trotter
  def ExpireRequests(self):
178 96e03b0b Guido Trotter
    """Delete all the expired requests.
179 e4ccf6cd Guido Trotter

180 e4ccf6cd Guido Trotter
    """
181 e4ccf6cd Guido Trotter
    now = time.time()
182 71e114da Iustin Pop
    for rsalt, rq in self._requests.items():
183 71e114da Iustin Pop
      if now >= rq.expiry:
184 96e03b0b Guido Trotter
        del self._requests[rsalt]
185 96e03b0b Guido Trotter
        client_reply = ConfdUpcallPayload(salt=rsalt,
186 96e03b0b Guido Trotter
                                          type=UPCALL_EXPIRE,
187 71e114da Iustin Pop
                                          orig_request=rq.request,
188 71e114da Iustin Pop
                                          extra_args=rq.args,
189 5f6f260a Guido Trotter
                                          client=self,
190 5f6f260a Guido Trotter
                                          )
191 96e03b0b Guido Trotter
        self._callback(client_reply)
192 e4ccf6cd Guido Trotter
193 d63997b3 Guido Trotter
  def SendRequest(self, request, args=None, coverage=None, async=True):
194 e4ccf6cd Guido Trotter
    """Send a confd request to some MCs
195 e4ccf6cd Guido Trotter

196 e4ccf6cd Guido Trotter
    @type request: L{objects.ConfdRequest}
197 e4ccf6cd Guido Trotter
    @param request: the request to send
198 e4ccf6cd Guido Trotter
    @type args: tuple
199 d63997b3 Guido Trotter
    @param args: additional callback arguments
200 e4ccf6cd Guido Trotter
    @type coverage: integer
201 d63997b3 Guido Trotter
    @param coverage: number of remote nodes to contact
202 8496d93c Guido Trotter
    @type async: boolean
203 8496d93c Guido Trotter
    @param async: handle the write asynchronously
204 e4ccf6cd Guido Trotter

205 e4ccf6cd Guido Trotter
    """
206 e4ccf6cd Guido Trotter
    if coverage is None:
207 e4ccf6cd Guido Trotter
      coverage = min(len(self._peers), constants.CONFD_DEFAULT_REQ_COVERAGE)
208 e4ccf6cd Guido Trotter
209 e4ccf6cd Guido Trotter
    if coverage > len(self._peers):
210 e4ccf6cd Guido Trotter
      raise errors.ConfdClientError("Not enough MCs known to provide the"
211 e4ccf6cd Guido Trotter
                                    " desired coverage")
212 e4ccf6cd Guido Trotter
213 e4ccf6cd Guido Trotter
    if not request.rsalt:
214 e4ccf6cd Guido Trotter
      raise errors.ConfdClientError("Missing request rsalt")
215 e4ccf6cd Guido Trotter
216 96e03b0b Guido Trotter
    self.ExpireRequests()
217 96e03b0b Guido Trotter
    if request.rsalt in self._requests:
218 e4ccf6cd Guido Trotter
      raise errors.ConfdClientError("Duplicate request rsalt")
219 e4ccf6cd Guido Trotter
220 e4ccf6cd Guido Trotter
    if request.type not in constants.CONFD_REQS:
221 e4ccf6cd Guido Trotter
      raise errors.ConfdClientError("Invalid request type")
222 e4ccf6cd Guido Trotter
223 e4ccf6cd Guido Trotter
    random.shuffle(self._peers)
224 e4ccf6cd Guido Trotter
    targets = self._peers[:coverage]
225 e4ccf6cd Guido Trotter
226 e4ccf6cd Guido Trotter
    now = time.time()
227 e4ccf6cd Guido Trotter
    payload = self._PackRequest(request, now=now)
228 e4ccf6cd Guido Trotter
229 e4ccf6cd Guido Trotter
    for target in targets:
230 e4ccf6cd Guido Trotter
      try:
231 e4ccf6cd Guido Trotter
        self._socket.enqueue_send(target, self._confd_port, payload)
232 e4ccf6cd Guido Trotter
      except errors.UdpDataSizeError:
233 e4ccf6cd Guido Trotter
        raise errors.ConfdClientError("Request too big")
234 e4ccf6cd Guido Trotter
235 e4ccf6cd Guido Trotter
    expire_time = now + constants.CONFD_CLIENT_EXPIRE_TIMEOUT
236 71e114da Iustin Pop
    self._requests[request.rsalt] = _Request(request, args, expire_time)
237 e4ccf6cd Guido Trotter
238 8496d93c Guido Trotter
    if not async:
239 8496d93c Guido Trotter
      self.FlushSendQueue()
240 8496d93c Guido Trotter
241 e4ccf6cd Guido Trotter
  def HandleResponse(self, payload, ip, port):
242 e4ccf6cd Guido Trotter
    """Asynchronous handler for a confd reply
243 e4ccf6cd Guido Trotter

244 e4ccf6cd Guido Trotter
    Call the relevant callback associated to the current request.
245 e4ccf6cd Guido Trotter

246 e4ccf6cd Guido Trotter
    """
247 e4ccf6cd Guido Trotter
    try:
248 e4ccf6cd Guido Trotter
      try:
249 e4ccf6cd Guido Trotter
        answer, salt = self._UnpackReply(payload)
250 a3db74e4 Guido Trotter
      except (errors.SignatureError, errors.ConfdMagicError), err:
251 a3db74e4 Guido Trotter
        if self._logger:
252 a3db74e4 Guido Trotter
          self._logger.debug("Discarding broken package: %s" % err)
253 e4ccf6cd Guido Trotter
        return
254 e4ccf6cd Guido Trotter
255 e4ccf6cd Guido Trotter
      try:
256 71e114da Iustin Pop
        rq = self._requests[salt]
257 e4ccf6cd Guido Trotter
      except KeyError:
258 a3db74e4 Guido Trotter
        if self._logger:
259 a3db74e4 Guido Trotter
          self._logger.debug("Discarding unknown (expired?) reply: %s" % err)
260 96e03b0b Guido Trotter
        return
261 96e03b0b Guido Trotter
262 96e03b0b Guido Trotter
      client_reply = ConfdUpcallPayload(salt=salt,
263 96e03b0b Guido Trotter
                                        type=UPCALL_REPLY,
264 96e03b0b Guido Trotter
                                        server_reply=answer,
265 71e114da Iustin Pop
                                        orig_request=rq.request,
266 96e03b0b Guido Trotter
                                        server_ip=ip,
267 96e03b0b Guido Trotter
                                        server_port=port,
268 71e114da Iustin Pop
                                        extra_args=rq.args,
269 5f6f260a Guido Trotter
                                        client=self,
270 5f6f260a Guido Trotter
                                       )
271 96e03b0b Guido Trotter
      self._callback(client_reply)
272 e4ccf6cd Guido Trotter
273 e4ccf6cd Guido Trotter
    finally:
274 96e03b0b Guido Trotter
      self.ExpireRequests()
275 96e03b0b Guido Trotter
276 8496d93c Guido Trotter
  def FlushSendQueue(self):
277 8496d93c Guido Trotter
    """Send out all pending requests.
278 8496d93c Guido Trotter

279 8496d93c Guido Trotter
    Can be used for synchronous client use.
280 8496d93c Guido Trotter

281 8496d93c Guido Trotter
    """
282 8496d93c Guido Trotter
    while self._socket.writable():
283 8496d93c Guido Trotter
      self._socket.handle_write()
284 8496d93c Guido Trotter
285 8496d93c Guido Trotter
  def ReceiveReply(self, timeout=1):
286 8496d93c Guido Trotter
    """Receive one reply.
287 8496d93c Guido Trotter

288 8496d93c Guido Trotter
    @type timeout: float
289 8496d93c Guido Trotter
    @param timeout: how long to wait for the reply
290 8496d93c Guido Trotter
    @rtype: boolean
291 8496d93c Guido Trotter
    @return: True if some data has been handled, False otherwise
292 8496d93c Guido Trotter

293 8496d93c Guido Trotter
    """
294 8496d93c Guido Trotter
    return self._socket.process_next_packet(timeout=timeout)
295 8496d93c Guido Trotter
296 96e03b0b Guido Trotter
297 96e03b0b Guido Trotter
# UPCALL_REPLY: server reply upcall
298 96e03b0b Guido Trotter
# has all ConfdUpcallPayload fields populated
299 96e03b0b Guido Trotter
UPCALL_REPLY = 1
300 96e03b0b Guido Trotter
# UPCALL_EXPIRE: internal library request expire
301 96e03b0b Guido Trotter
# has only salt, type, orig_request and extra_args
302 96e03b0b Guido Trotter
UPCALL_EXPIRE = 2
303 96e03b0b Guido Trotter
CONFD_UPCALL_TYPES = frozenset([
304 96e03b0b Guido Trotter
  UPCALL_REPLY,
305 96e03b0b Guido Trotter
  UPCALL_EXPIRE,
306 96e03b0b Guido Trotter
  ])
307 96e03b0b Guido Trotter
308 96e03b0b Guido Trotter
309 96e03b0b Guido Trotter
class ConfdUpcallPayload(objects.ConfigObject):
310 96e03b0b Guido Trotter
  """Callback argument for confd replies
311 96e03b0b Guido Trotter

312 96e03b0b Guido Trotter
  @type salt: string
313 96e03b0b Guido Trotter
  @ivar salt: salt associated with the query
314 96e03b0b Guido Trotter
  @type type: one of confd.client.CONFD_UPCALL_TYPES
315 96e03b0b Guido Trotter
  @ivar type: upcall type (server reply, expired request, ...)
316 96e03b0b Guido Trotter
  @type orig_request: L{objects.ConfdRequest}
317 96e03b0b Guido Trotter
  @ivar orig_request: original request
318 96e03b0b Guido Trotter
  @type server_reply: L{objects.ConfdReply}
319 96e03b0b Guido Trotter
  @ivar server_reply: server reply
320 96e03b0b Guido Trotter
  @type server_ip: string
321 96e03b0b Guido Trotter
  @ivar server_ip: answering server ip address
322 96e03b0b Guido Trotter
  @type server_port: int
323 96e03b0b Guido Trotter
  @ivar server_port: answering server port
324 96e03b0b Guido Trotter
  @type extra_args: any
325 96e03b0b Guido Trotter
  @ivar extra_args: 'args' argument of the SendRequest function
326 5f6f260a Guido Trotter
  @type client: L{ConfdClient}
327 5f6f260a Guido Trotter
  @ivar client: current confd client instance
328 96e03b0b Guido Trotter

329 96e03b0b Guido Trotter
  """
330 96e03b0b Guido Trotter
  __slots__ = [
331 96e03b0b Guido Trotter
    "salt",
332 96e03b0b Guido Trotter
    "type",
333 96e03b0b Guido Trotter
    "orig_request",
334 96e03b0b Guido Trotter
    "server_reply",
335 96e03b0b Guido Trotter
    "server_ip",
336 96e03b0b Guido Trotter
    "server_port",
337 96e03b0b Guido Trotter
    "extra_args",
338 5f6f260a Guido Trotter
    "client",
339 96e03b0b Guido Trotter
    ]
340 e4ccf6cd Guido Trotter
341 e4ccf6cd Guido Trotter
342 e4ccf6cd Guido Trotter
class ConfdClientRequest(objects.ConfdRequest):
343 e4ccf6cd Guido Trotter
  """This is the client-side version of ConfdRequest.
344 e4ccf6cd Guido Trotter

345 e4ccf6cd Guido Trotter
  This version of the class helps creating requests, on the client side, by
346 e4ccf6cd Guido Trotter
  filling in some default values.
347 e4ccf6cd Guido Trotter

348 e4ccf6cd Guido Trotter
  """
349 e4ccf6cd Guido Trotter
  def __init__(self, **kwargs):
350 e4ccf6cd Guido Trotter
    objects.ConfdRequest.__init__(self, **kwargs)
351 e4ccf6cd Guido Trotter
    if not self.rsalt:
352 e4ccf6cd Guido Trotter
      self.rsalt = utils.NewUUID()
353 e4ccf6cd Guido Trotter
    if not self.protocol:
354 e4ccf6cd Guido Trotter
      self.protocol = constants.CONFD_PROTOCOL_VERSION
355 e4ccf6cd Guido Trotter
    if self.type not in constants.CONFD_REQS:
356 e4ccf6cd Guido Trotter
      raise errors.ConfdClientError("Invalid request type")
357 e4ccf6cd Guido Trotter
358 392ca296 Guido Trotter
359 392ca296 Guido Trotter
class ConfdFilterCallback:
360 392ca296 Guido Trotter
  """Callback that calls another callback, but filters duplicate results.
361 392ca296 Guido Trotter

362 49b3fdac Iustin Pop
  @ivar consistent: a dictionary indexed by salt; for each salt, if
363 49b3fdac Iustin Pop
      all responses ware identical, this will be True; this is the
364 49b3fdac Iustin Pop
      expected state on a healthy cluster; on inconsistent or
365 49b3fdac Iustin Pop
      partitioned clusters, this might be False, if we see answers
366 49b3fdac Iustin Pop
      with the same serial but different contents
367 49b3fdac Iustin Pop

368 392ca296 Guido Trotter
  """
369 392ca296 Guido Trotter
  def __init__(self, callback, logger=None):
370 392ca296 Guido Trotter
    """Constructor for ConfdFilterCallback
371 392ca296 Guido Trotter

372 392ca296 Guido Trotter
    @type callback: f(L{ConfdUpcallPayload})
373 392ca296 Guido Trotter
    @param callback: function to call when getting answers
374 69b99987 Michael Hanselmann
    @type logger: logging.Logger
375 d63997b3 Guido Trotter
    @param logger: optional logger for internal conditions
376 392ca296 Guido Trotter

377 392ca296 Guido Trotter
    """
378 392ca296 Guido Trotter
    if not callable(callback):
379 392ca296 Guido Trotter
      raise errors.ProgrammerError("callback must be callable")
380 392ca296 Guido Trotter
381 392ca296 Guido Trotter
    self._callback = callback
382 392ca296 Guido Trotter
    self._logger = logger
383 392ca296 Guido Trotter
    # answers contains a dict of salt -> answer
384 392ca296 Guido Trotter
    self._answers = {}
385 49b3fdac Iustin Pop
    self.consistent = {}
386 392ca296 Guido Trotter
387 392ca296 Guido Trotter
  def _LogFilter(self, salt, new_reply, old_reply):
388 392ca296 Guido Trotter
    if not self._logger:
389 392ca296 Guido Trotter
      return
390 392ca296 Guido Trotter
391 392ca296 Guido Trotter
    if new_reply.serial > old_reply.serial:
392 392ca296 Guido Trotter
      self._logger.debug("Filtering confirming answer, with newer"
393 392ca296 Guido Trotter
                         " serial for query %s" % salt)
394 392ca296 Guido Trotter
    elif new_reply.serial == old_reply.serial:
395 392ca296 Guido Trotter
      if new_reply.answer != old_reply.answer:
396 392ca296 Guido Trotter
        self._logger.warning("Got incoherent answers for query %s"
397 392ca296 Guido Trotter
                             " (serial: %s)" % (salt, new_reply.serial))
398 392ca296 Guido Trotter
      else:
399 392ca296 Guido Trotter
        self._logger.debug("Filtering confirming answer, with same"
400 392ca296 Guido Trotter
                           " serial for query %s" % salt)
401 392ca296 Guido Trotter
    else:
402 392ca296 Guido Trotter
      self._logger.debug("Filtering outdated answer for query %s"
403 392ca296 Guido Trotter
                         " serial: (%d < %d)" % (salt, old_reply.serial,
404 392ca296 Guido Trotter
                                                 new_reply.serial))
405 392ca296 Guido Trotter
406 392ca296 Guido Trotter
  def _HandleExpire(self, up):
407 392ca296 Guido Trotter
    # if we have no answer we have received none, before the expiration.
408 a9613def Guido Trotter
    if up.salt in self._answers:
409 a9613def Guido Trotter
      del self._answers[up.salt]
410 49b3fdac Iustin Pop
    if up.salt in self.consistent:
411 49b3fdac Iustin Pop
      del self.consistent[up.salt]
412 392ca296 Guido Trotter
413 392ca296 Guido Trotter
  def _HandleReply(self, up):
414 392ca296 Guido Trotter
    """Handle a single confd reply, and decide whether to filter it.
415 392ca296 Guido Trotter

416 392ca296 Guido Trotter
    @rtype: boolean
417 392ca296 Guido Trotter
    @return: True if the reply should be filtered, False if it should be passed
418 392ca296 Guido Trotter
             on to the up-callback
419 392ca296 Guido Trotter

420 392ca296 Guido Trotter
    """
421 392ca296 Guido Trotter
    filter_upcall = False
422 392ca296 Guido Trotter
    salt = up.salt
423 49b3fdac Iustin Pop
    if salt not in self.consistent:
424 49b3fdac Iustin Pop
      self.consistent[salt] = True
425 392ca296 Guido Trotter
    if salt not in self._answers:
426 392ca296 Guido Trotter
      # first answer for a query (don't filter, and record)
427 392ca296 Guido Trotter
      self._answers[salt] = up.server_reply
428 392ca296 Guido Trotter
    elif up.server_reply.serial > self._answers[salt].serial:
429 392ca296 Guido Trotter
      # newer answer (record, and compare contents)
430 392ca296 Guido Trotter
      old_answer = self._answers[salt]
431 392ca296 Guido Trotter
      self._answers[salt] = up.server_reply
432 392ca296 Guido Trotter
      if up.server_reply.answer == old_answer.answer:
433 392ca296 Guido Trotter
        # same content (filter) (version upgrade was unrelated)
434 392ca296 Guido Trotter
        filter_upcall = True
435 392ca296 Guido Trotter
        self._LogFilter(salt, up.server_reply, old_answer)
436 392ca296 Guido Trotter
      # else: different content, pass up a second answer
437 392ca296 Guido Trotter
    else:
438 392ca296 Guido Trotter
      # older or same-version answer (duplicate or outdated, filter)
439 39292d3a Iustin Pop
      if (up.server_reply.serial == self._answers[salt].serial and
440 39292d3a Iustin Pop
          up.server_reply.answer != self._answers[salt].answer):
441 49b3fdac Iustin Pop
        self.consistent[salt] = False
442 392ca296 Guido Trotter
      filter_upcall = True
443 392ca296 Guido Trotter
      self._LogFilter(salt, up.server_reply, self._answers[salt])
444 392ca296 Guido Trotter
445 392ca296 Guido Trotter
    return filter_upcall
446 392ca296 Guido Trotter
447 392ca296 Guido Trotter
  def __call__(self, up):
448 392ca296 Guido Trotter
    """Filtering callback
449 392ca296 Guido Trotter

450 392ca296 Guido Trotter
    @type up: L{ConfdUpcallPayload}
451 392ca296 Guido Trotter
    @param up: upper callback
452 392ca296 Guido Trotter

453 392ca296 Guido Trotter
    """
454 392ca296 Guido Trotter
    filter_upcall = False
455 392ca296 Guido Trotter
    if up.type == UPCALL_REPLY:
456 392ca296 Guido Trotter
      filter_upcall = self._HandleReply(up)
457 392ca296 Guido Trotter
    elif up.type == UPCALL_EXPIRE:
458 392ca296 Guido Trotter
      self._HandleExpire(up)
459 392ca296 Guido Trotter
460 392ca296 Guido Trotter
    if not filter_upcall:
461 392ca296 Guido Trotter
      self._callback(up)
462 04cdf663 Guido Trotter
463 04cdf663 Guido Trotter
464 04cdf663 Guido Trotter
class ConfdCountingCallback:
465 04cdf663 Guido Trotter
  """Callback that calls another callback, and counts the answers
466 04cdf663 Guido Trotter

467 04cdf663 Guido Trotter
  """
468 04cdf663 Guido Trotter
  def __init__(self, callback, logger=None):
469 04cdf663 Guido Trotter
    """Constructor for ConfdCountingCallback
470 04cdf663 Guido Trotter

471 04cdf663 Guido Trotter
    @type callback: f(L{ConfdUpcallPayload})
472 04cdf663 Guido Trotter
    @param callback: function to call when getting answers
473 04cdf663 Guido Trotter
    @type logger: logging.Logger
474 04cdf663 Guido Trotter
    @param logger: optional logger for internal conditions
475 04cdf663 Guido Trotter

476 04cdf663 Guido Trotter
    """
477 04cdf663 Guido Trotter
    if not callable(callback):
478 04cdf663 Guido Trotter
      raise errors.ProgrammerError("callback must be callable")
479 04cdf663 Guido Trotter
480 04cdf663 Guido Trotter
    self._callback = callback
481 04cdf663 Guido Trotter
    self._logger = logger
482 04cdf663 Guido Trotter
    # answers contains a dict of salt -> count
483 04cdf663 Guido Trotter
    self._answers = {}
484 04cdf663 Guido Trotter
485 04cdf663 Guido Trotter
  def RegisterQuery(self, salt):
486 04cdf663 Guido Trotter
    if salt in self._answers:
487 04cdf663 Guido Trotter
      raise errors.ProgrammerError("query already registered")
488 04cdf663 Guido Trotter
    self._answers[salt] = 0
489 04cdf663 Guido Trotter
490 04cdf663 Guido Trotter
  def AllAnswered(self):
491 04cdf663 Guido Trotter
    """Have all the registered queries received at least an answer?
492 04cdf663 Guido Trotter

493 04cdf663 Guido Trotter
    """
494 04cdf663 Guido Trotter
    return utils.all(self._answers.values())
495 04cdf663 Guido Trotter
496 04cdf663 Guido Trotter
  def _HandleExpire(self, up):
497 04cdf663 Guido Trotter
    # if we have no answer we have received none, before the expiration.
498 04cdf663 Guido Trotter
    if up.salt in self._answers:
499 04cdf663 Guido Trotter
      del self._answers[up.salt]
500 04cdf663 Guido Trotter
501 04cdf663 Guido Trotter
  def _HandleReply(self, up):
502 04cdf663 Guido Trotter
    """Handle a single confd reply, and decide whether to filter it.
503 04cdf663 Guido Trotter

504 04cdf663 Guido Trotter
    @rtype: boolean
505 04cdf663 Guido Trotter
    @return: True if the reply should be filtered, False if it should be passed
506 04cdf663 Guido Trotter
             on to the up-callback
507 04cdf663 Guido Trotter

508 04cdf663 Guido Trotter
    """
509 04cdf663 Guido Trotter
    if up.salt in self._answers:
510 04cdf663 Guido Trotter
      self._answers[up.salt] += 1
511 04cdf663 Guido Trotter
512 04cdf663 Guido Trotter
  def __call__(self, up):
513 04cdf663 Guido Trotter
    """Filtering callback
514 04cdf663 Guido Trotter

515 04cdf663 Guido Trotter
    @type up: L{ConfdUpcallPayload}
516 04cdf663 Guido Trotter
    @param up: upper callback
517 04cdf663 Guido Trotter

518 04cdf663 Guido Trotter
    """
519 04cdf663 Guido Trotter
    if up.type == UPCALL_REPLY:
520 04cdf663 Guido Trotter
      self._HandleReply(up)
521 04cdf663 Guido Trotter
    elif up.type == UPCALL_EXPIRE:
522 04cdf663 Guido Trotter
      self._HandleExpire(up)
523 04cdf663 Guido Trotter
    self._callback(up)
524 5b349fd1 Iustin Pop
525 71e114da Iustin Pop
526 5b349fd1 Iustin Pop
def GetConfdClient(callback):
527 5b349fd1 Iustin Pop
  """Return a client configured using the given callback.
528 5b349fd1 Iustin Pop

529 5b349fd1 Iustin Pop
  This is handy to abstract the MC list and HMAC key reading.
530 5b349fd1 Iustin Pop

531 5b349fd1 Iustin Pop
  @attention: This should only be called on nodes which are part of a
532 5b349fd1 Iustin Pop
      cluster, since it depends on a valid (ganeti) data directory;
533 5b349fd1 Iustin Pop
      for code running outside of a cluster, you need to create the
534 5b349fd1 Iustin Pop
      client manually
535 5b349fd1 Iustin Pop

536 5b349fd1 Iustin Pop
  """
537 5b349fd1 Iustin Pop
  ss = ssconf.SimpleStore()
538 5b349fd1 Iustin Pop
  mc_file = ss.KeyToFilename(constants.SS_MASTER_CANDIDATES_IPS)
539 5b349fd1 Iustin Pop
  mc_list = utils.ReadFile(mc_file).splitlines()
540 5b349fd1 Iustin Pop
  hmac_key = utils.ReadFile(constants.CONFD_HMAC_KEY)
541 5b349fd1 Iustin Pop
  return ConfdClient(hmac_key, mc_list, callback)