Statistics
| Branch: | Tag: | Revision:

root / lib / confd / client.py @ db169865

History | View | Annotate | Download (13 kB)

1
#
2
#
3

    
4
# Copyright (C) 2009 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Ganeti confd client
23

24
Clients can use the confd client library to send requests to a group of master
25
candidates running confd. The expected usage is through the asyncore framework,
26
by sending queries, and asynchronously receiving replies through a callback.
27

28
This way the client library doesn't ever need to "wait" on a particular answer,
29
and can proceed even if some udp packets are lost. It's up to the user to
30
reschedule queries if they haven't received responses and they need them.
31

32
Example usage::
33

34
  client = ConfdClient(...) # includes callback specification
35
  req = confd_client.ConfdClientRequest(type=constants.CONFD_REQ_PING)
36
  client.SendRequest(req)
37
  # then make sure your client calls asyncore.loop() or daemon.Mainloop.Run()
38
  # ... wait ...
39
  # And your callback will be called by asyncore, when your query gets a
40
  # response, or when it expires.
41

42
You can use the provided ConfdFilterCallback to act as a filter, only passing
43
"newer" answer to your callback, and filtering out outdated ones, or ones
44
confirming what you already got.
45

46
"""
47

    
48
# pylint: disable-msg=E0203
49

    
50
# E0203: Access to member %r before its definition, since we use
51
# objects.py which doesn't explicitely initialise its members
52

    
53
import time
54
import random
55

    
56
from ganeti import utils
57
from ganeti import constants
58
from ganeti import objects
59
from ganeti import serializer
60
from ganeti import daemon # contains AsyncUDPSocket
61
from ganeti import errors
62
from ganeti import confd
63

    
64

    
65
class ConfdAsyncUDPClient(daemon.AsyncUDPSocket):
66
  """Confd udp asyncore client
67

68
  This is kept separate from the main ConfdClient to make sure it's easy to
69
  implement a non-asyncore based client library.
70

71
  """
72
  def __init__(self, client):
73
    """Constructor for ConfdAsyncUDPClient
74

75
    @type client: L{ConfdClient}
76
    @param client: client library, to pass the datagrams to
77

78
    """
79
    daemon.AsyncUDPSocket.__init__(self)
80
    self.client = client
81

    
82
  # this method is overriding a daemon.AsyncUDPSocket method
83
  def handle_datagram(self, payload, ip, port):
84
    self.client.HandleResponse(payload, ip, port)
85

    
86

    
87
class ConfdClient:
88
  """Send queries to confd, and get back answers.
89

90
  Since the confd model works by querying multiple master candidates, and
91
  getting back answers, this is an asynchronous library. It can either work
92
  through asyncore or with your own handling.
93

94
  """
95
  def __init__(self, hmac_key, peers, callback, port=None, logger=None):
96
    """Constructor for ConfdClient
97

98
    @type hmac_key: string
99
    @param hmac_key: hmac key to talk to confd
100
    @type peers: list
101
    @param peers: list of peer nodes
102
    @type callback: f(L{ConfdUpcallPayload})
103
    @param callback: function to call when getting answers
104
    @type port: integer
105
    @keyword port: confd port (default: use GetDaemonPort)
106
    @type logger: logging.Logger
107
    @keyword logger: optional logger for internal conditions
108

109
    """
110
    if not callable(callback):
111
      raise errors.ProgrammerError("callback must be callable")
112

    
113
    self.UpdatePeerList(peers)
114
    self._hmac_key = hmac_key
115
    self._socket = ConfdAsyncUDPClient(self)
116
    self._callback = callback
117
    self._confd_port = port
118
    self._logger = logger
119
    self._requests = {}
120
    self._expire_requests = []
121

    
122
    if self._confd_port is None:
123
      self._confd_port = utils.GetDaemonPort(constants.CONFD)
124

    
125
  def UpdatePeerList(self, peers):
126
    """Update the list of peers
127

128
    @type peers: list
129
    @param peers: list of peer nodes
130

131
    """
132
    # we are actually called from init, so:
133
    # pylint: disable-msg=W0201
134
    if not isinstance(peers, list):
135
      raise errors.ProgrammerError("peers must be a list")
136
    # make a copy of peers, since we're going to shuffle the list, later
137
    self._peers = list(peers)
138

    
139
  def _PackRequest(self, request, now=None):
140
    """Prepare a request to be sent on the wire.
141

142
    This function puts a proper salt in a confd request, puts the proper salt,
143
    and adds the correct magic number.
144

145
    """
146
    if now is None:
147
      now = time.time()
148
    tstamp = '%d' % now
149
    req = serializer.DumpSignedJson(request.ToDict(), self._hmac_key, tstamp)
150
    return confd.PackMagic(req)
151

    
152
  def _UnpackReply(self, payload):
153
    in_payload = confd.UnpackMagic(payload)
154
    (dict_answer, salt) = serializer.LoadSignedJson(in_payload, self._hmac_key)
155
    answer = objects.ConfdReply.FromDict(dict_answer)
156
    return answer, salt
157

    
158
  def ExpireRequests(self):
159
    """Delete all the expired requests.
160

161
    """
162
    now = time.time()
163
    while self._expire_requests:
164
      expire_time, rsalt = self._expire_requests[0]
165
      if now >= expire_time:
166
        self._expire_requests.pop(0)
167
        (request, args) = self._requests[rsalt]
168
        del self._requests[rsalt]
169
        client_reply = ConfdUpcallPayload(salt=rsalt,
170
                                          type=UPCALL_EXPIRE,
171
                                          orig_request=request,
172
                                          extra_args=args,
173
                                          client=self,
174
                                          )
175
        self._callback(client_reply)
176
      else:
177
        break
178

    
179
  def SendRequest(self, request, args=None, coverage=None):
180
    """Send a confd request to some MCs
181

182
    @type request: L{objects.ConfdRequest}
183
    @param request: the request to send
184
    @type args: tuple
185
    @keyword args: additional callback arguments
186
    @type coverage: integer
187
    @keyword coverage: number of remote nodes to contact
188

189
    """
190
    if coverage is None:
191
      coverage = min(len(self._peers), constants.CONFD_DEFAULT_REQ_COVERAGE)
192

    
193
    if coverage > len(self._peers):
194
      raise errors.ConfdClientError("Not enough MCs known to provide the"
195
                                    " desired coverage")
196

    
197
    if not request.rsalt:
198
      raise errors.ConfdClientError("Missing request rsalt")
199

    
200
    self.ExpireRequests()
201
    if request.rsalt in self._requests:
202
      raise errors.ConfdClientError("Duplicate request rsalt")
203

    
204
    if request.type not in constants.CONFD_REQS:
205
      raise errors.ConfdClientError("Invalid request type")
206

    
207
    random.shuffle(self._peers)
208
    targets = self._peers[:coverage]
209

    
210
    now = time.time()
211
    payload = self._PackRequest(request, now=now)
212

    
213
    for target in targets:
214
      try:
215
        self._socket.enqueue_send(target, self._confd_port, payload)
216
      except errors.UdpDataSizeError:
217
        raise errors.ConfdClientError("Request too big")
218

    
219
    self._requests[request.rsalt] = (request, args)
220
    expire_time = now + constants.CONFD_CLIENT_EXPIRE_TIMEOUT
221
    self._expire_requests.append((expire_time, request.rsalt))
222

    
223
  def HandleResponse(self, payload, ip, port):
224
    """Asynchronous handler for a confd reply
225

226
    Call the relevant callback associated to the current request.
227

228
    """
229
    try:
230
      try:
231
        answer, salt = self._UnpackReply(payload)
232
      except (errors.SignatureError, errors.ConfdMagicError), err:
233
        if self._logger:
234
          self._logger.debug("Discarding broken package: %s" % err)
235
        return
236

    
237
      try:
238
        (request, args) = self._requests[salt]
239
      except KeyError:
240
        if self._logger:
241
          self._logger.debug("Discarding unknown (expired?) reply: %s" % err)
242
        return
243

    
244
      client_reply = ConfdUpcallPayload(salt=salt,
245
                                        type=UPCALL_REPLY,
246
                                        server_reply=answer,
247
                                        orig_request=request,
248
                                        server_ip=ip,
249
                                        server_port=port,
250
                                        extra_args=args,
251
                                        client=self,
252
                                       )
253
      self._callback(client_reply)
254

    
255
    finally:
256
      self.ExpireRequests()
257

    
258

    
259
# UPCALL_REPLY: server reply upcall
260
# has all ConfdUpcallPayload fields populated
261
UPCALL_REPLY = 1
262
# UPCALL_EXPIRE: internal library request expire
263
# has only salt, type, orig_request and extra_args
264
UPCALL_EXPIRE = 2
265
CONFD_UPCALL_TYPES = frozenset([
266
  UPCALL_REPLY,
267
  UPCALL_EXPIRE,
268
  ])
269

    
270

    
271
class ConfdUpcallPayload(objects.ConfigObject):
272
  """Callback argument for confd replies
273

274
  @type salt: string
275
  @ivar salt: salt associated with the query
276
  @type type: one of confd.client.CONFD_UPCALL_TYPES
277
  @ivar type: upcall type (server reply, expired request, ...)
278
  @type orig_request: L{objects.ConfdRequest}
279
  @ivar orig_request: original request
280
  @type server_reply: L{objects.ConfdReply}
281
  @ivar server_reply: server reply
282
  @type server_ip: string
283
  @ivar server_ip: answering server ip address
284
  @type server_port: int
285
  @ivar server_port: answering server port
286
  @type extra_args: any
287
  @ivar extra_args: 'args' argument of the SendRequest function
288
  @type client: L{ConfdClient}
289
  @ivar client: current confd client instance
290

291
  """
292
  __slots__ = [
293
    "salt",
294
    "type",
295
    "orig_request",
296
    "server_reply",
297
    "server_ip",
298
    "server_port",
299
    "extra_args",
300
    "client",
301
    ]
302

    
303

    
304
class ConfdClientRequest(objects.ConfdRequest):
305
  """This is the client-side version of ConfdRequest.
306

307
  This version of the class helps creating requests, on the client side, by
308
  filling in some default values.
309

310
  """
311
  def __init__(self, **kwargs):
312
    objects.ConfdRequest.__init__(self, **kwargs)
313
    if not self.rsalt:
314
      self.rsalt = utils.NewUUID()
315
    if not self.protocol:
316
      self.protocol = constants.CONFD_PROTOCOL_VERSION
317
    if self.type not in constants.CONFD_REQS:
318
      raise errors.ConfdClientError("Invalid request type")
319

    
320

    
321
class ConfdFilterCallback:
322
  """Callback that calls another callback, but filters duplicate results.
323

324
  """
325
  def __init__(self, callback, logger=None):
326
    """Constructor for ConfdFilterCallback
327

328
    @type callback: f(L{ConfdUpcallPayload})
329
    @param callback: function to call when getting answers
330
    @type logger: logging.Logger
331
    @keyword logger: optional logger for internal conditions
332

333
    """
334
    if not callable(callback):
335
      raise errors.ProgrammerError("callback must be callable")
336

    
337
    self._callback = callback
338
    self._logger = logger
339
    # answers contains a dict of salt -> answer
340
    self._answers = {}
341

    
342
  def _LogFilter(self, salt, new_reply, old_reply):
343
    if not self._logger:
344
      return
345

    
346
    if new_reply.serial > old_reply.serial:
347
      self._logger.debug("Filtering confirming answer, with newer"
348
                         " serial for query %s" % salt)
349
    elif new_reply.serial == old_reply.serial:
350
      if new_reply.answer != old_reply.answer:
351
        self._logger.warning("Got incoherent answers for query %s"
352
                             " (serial: %s)" % (salt, new_reply.serial))
353
      else:
354
        self._logger.debug("Filtering confirming answer, with same"
355
                           " serial for query %s" % salt)
356
    else:
357
      self._logger.debug("Filtering outdated answer for query %s"
358
                         " serial: (%d < %d)" % (salt, old_reply.serial,
359
                                                 new_reply.serial))
360

    
361
  def _HandleExpire(self, up):
362
    # if we have no answer we have received none, before the expiration.
363
    if up.salt in self._answers:
364
      del self._answers[up.salt]
365

    
366
  def _HandleReply(self, up):
367
    """Handle a single confd reply, and decide whether to filter it.
368

369
    @rtype: boolean
370
    @return: True if the reply should be filtered, False if it should be passed
371
             on to the up-callback
372

373
    """
374
    filter_upcall = False
375
    salt = up.salt
376
    if salt not in self._answers:
377
      # first answer for a query (don't filter, and record)
378
      self._answers[salt] = up.server_reply
379
    elif up.server_reply.serial > self._answers[salt].serial:
380
      # newer answer (record, and compare contents)
381
      old_answer = self._answers[salt]
382
      self._answers[salt] = up.server_reply
383
      if up.server_reply.answer == old_answer.answer:
384
        # same content (filter) (version upgrade was unrelated)
385
        filter_upcall = True
386
        self._LogFilter(salt, up.server_reply, old_answer)
387
      # else: different content, pass up a second answer
388
    else:
389
      # older or same-version answer (duplicate or outdated, filter)
390
      filter_upcall = True
391
      self._LogFilter(salt, up.server_reply, self._answers[salt])
392

    
393
    return filter_upcall
394

    
395
  def __call__(self, up):
396
    """Filtering callback
397

398
    @type up: L{ConfdUpcallPayload}
399
    @param up: upper callback
400

401
    """
402
    filter_upcall = False
403
    if up.type == UPCALL_REPLY:
404
      filter_upcall = self._HandleReply(up)
405
    elif up.type == UPCALL_EXPIRE:
406
      self._HandleExpire(up)
407

    
408
    if not filter_upcall:
409
      self._callback(up)