Statistics
| Branch: | Tag: | Revision:

root / lib / confd / client.py @ 30e4e741

History | View | Annotate | Download (12.9 kB)

1
#
2
#
3

    
4
# Copyright (C) 2009 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Ganeti confd client
23

24
Clients can use the confd client library to send requests to a group of master
25
candidates running confd. The expected usage is through the asyncore framework,
26
by sending queries, and asynchronously receiving replies through a callback.
27

28
This way the client library doesn't ever need to "wait" on a particular answer,
29
and can proceed even if some udp packets are lost. It's up to the user to
30
reschedule queries if they haven't received responses and they need them.
31

32
Example usage::
33

34
  client = ConfdClient(...) # includes callback specification
35
  req = confd_client.ConfdClientRequest(type=constants.CONFD_REQ_PING)
36
  client.SendRequest(req)
37
  # then make sure your client calls asyncore.loop() or daemon.Mainloop.Run()
38
  # ... wait ...
39
  # And your callback will be called by asyncore, when your query gets a
40
  # response, or when it expires.
41

42
You can use the provided ConfdFilterCallback to act as a filter, only passing
43
"newer" answer to your callback, and filtering out outdated ones, or ones
44
confirming what you already got.
45

46
"""
47

    
48
# pylint: disable-msg=E0203
49

    
50
# E0203: Access to member %r before its definition, since we use
51
# objects.py which doesn't explicitely initialise its members
52

    
53
import time
54
import random
55

    
56
from ganeti import utils
57
from ganeti import constants
58
from ganeti import objects
59
from ganeti import serializer
60
from ganeti import daemon # contains AsyncUDPSocket
61
from ganeti import errors
62
from ganeti import confd
63

    
64

    
65
class ConfdAsyncUDPClient(daemon.AsyncUDPSocket):
66
  """Confd udp asyncore client
67

68
  This is kept separate from the main ConfdClient to make sure it's easy to
69
  implement a non-asyncore based client library.
70

71
  """
72
  def __init__(self, client):
73
    """Constructor for ConfdAsyncUDPClient
74

75
    @type client: L{ConfdClient}
76
    @param client: client library, to pass the datagrams to
77

78
    """
79
    daemon.AsyncUDPSocket.__init__(self)
80
    self.client = client
81

    
82
  # this method is overriding a daemon.AsyncUDPSocket method
83
  def handle_datagram(self, payload, ip, port):
84
    self.client.HandleResponse(payload, ip, port)
85

    
86

    
87
class ConfdClient:
88
  """Send queries to confd, and get back answers.
89

90
  Since the confd model works by querying multiple master candidates, and
91
  getting back answers, this is an asynchronous library. It can either work
92
  through asyncore or with your own handling.
93

94
  """
95
  def __init__(self, hmac_key, peers, callback, port=None, logger=None):
96
    """Constructor for ConfdClient
97

98
    @type hmac_key: string
99
    @param hmac_key: hmac key to talk to confd
100
    @type peers: list
101
    @param peers: list of peer nodes
102
    @type callback: f(L{ConfdUpcallPayload})
103
    @param callback: function to call when getting answers
104
    @type port: integer
105
    @keyword port: confd port (default: use GetDaemonPort)
106
    @type logger: logging.Logger
107
    @keyword logger: optional logger for internal conditions
108

109
    """
110
    if not callable(callback):
111
      raise errors.ProgrammerError("callback must be callable")
112

    
113
    self.UpdatePeerList(peers)
114
    self._hmac_key = hmac_key
115
    self._socket = ConfdAsyncUDPClient(self)
116
    self._callback = callback
117
    self._confd_port = port
118
    self._logger = logger
119
    self._requests = {}
120
    self._expire_requests = []
121

    
122
    if self._confd_port is None:
123
      self._confd_port = utils.GetDaemonPort(constants.CONFD)
124

    
125
  def UpdatePeerList(self, peers):
126
    """Update the list of peers
127

128
    @type peers: list
129
    @param peers: list of peer nodes
130

131
    """
132
    # we are actually called from init, so:
133
    # pylint: disable-msg=W0201
134
    if not isinstance(peers, list):
135
      raise errors.ProgrammerError("peers must be a list")
136
    self._peers = peers
137

    
138
  def _PackRequest(self, request, now=None):
139
    """Prepare a request to be sent on the wire.
140

141
    This function puts a proper salt in a confd request, puts the proper salt,
142
    and adds the correct magic number.
143

144
    """
145
    if now is None:
146
      now = time.time()
147
    tstamp = '%d' % now
148
    req = serializer.DumpSignedJson(request.ToDict(), self._hmac_key, tstamp)
149
    return confd.PackMagic(req)
150

    
151
  def _UnpackReply(self, payload):
152
    in_payload = confd.UnpackMagic(payload)
153
    (dict_answer, salt) = serializer.LoadSignedJson(in_payload, self._hmac_key)
154
    answer = objects.ConfdReply.FromDict(dict_answer)
155
    return answer, salt
156

    
157
  def ExpireRequests(self):
158
    """Delete all the expired requests.
159

160
    """
161
    now = time.time()
162
    while self._expire_requests:
163
      expire_time, rsalt = self._expire_requests[0]
164
      if now >= expire_time:
165
        self._expire_requests.pop(0)
166
        (request, args) = self._requests[rsalt]
167
        del self._requests[rsalt]
168
        client_reply = ConfdUpcallPayload(salt=rsalt,
169
                                          type=UPCALL_EXPIRE,
170
                                          orig_request=request,
171
                                          extra_args=args,
172
                                          client=self,
173
                                          )
174
        self._callback(client_reply)
175
      else:
176
        break
177

    
178
  def SendRequest(self, request, args=None, coverage=None):
179
    """Send a confd request to some MCs
180

181
    @type request: L{objects.ConfdRequest}
182
    @param request: the request to send
183
    @type args: tuple
184
    @keyword args: additional callback arguments
185
    @type coverage: integer
186
    @keyword coverage: number of remote nodes to contact
187

188
    """
189
    if coverage is None:
190
      coverage = min(len(self._peers), constants.CONFD_DEFAULT_REQ_COVERAGE)
191

    
192
    if coverage > len(self._peers):
193
      raise errors.ConfdClientError("Not enough MCs known to provide the"
194
                                    " desired coverage")
195

    
196
    if not request.rsalt:
197
      raise errors.ConfdClientError("Missing request rsalt")
198

    
199
    self.ExpireRequests()
200
    if request.rsalt in self._requests:
201
      raise errors.ConfdClientError("Duplicate request rsalt")
202

    
203
    if request.type not in constants.CONFD_REQS:
204
      raise errors.ConfdClientError("Invalid request type")
205

    
206
    random.shuffle(self._peers)
207
    targets = self._peers[:coverage]
208

    
209
    now = time.time()
210
    payload = self._PackRequest(request, now=now)
211

    
212
    for target in targets:
213
      try:
214
        self._socket.enqueue_send(target, self._confd_port, payload)
215
      except errors.UdpDataSizeError:
216
        raise errors.ConfdClientError("Request too big")
217

    
218
    self._requests[request.rsalt] = (request, args)
219
    expire_time = now + constants.CONFD_CLIENT_EXPIRE_TIMEOUT
220
    self._expire_requests.append((expire_time, request.rsalt))
221

    
222
  def HandleResponse(self, payload, ip, port):
223
    """Asynchronous handler for a confd reply
224

225
    Call the relevant callback associated to the current request.
226

227
    """
228
    try:
229
      try:
230
        answer, salt = self._UnpackReply(payload)
231
      except (errors.SignatureError, errors.ConfdMagicError), err:
232
        if self._logger:
233
          self._logger.debug("Discarding broken package: %s" % err)
234
        return
235

    
236
      try:
237
        (request, args) = self._requests[salt]
238
      except KeyError:
239
        if self._logger:
240
          self._logger.debug("Discarding unknown (expired?) reply: %s" % err)
241
        return
242

    
243
      client_reply = ConfdUpcallPayload(salt=salt,
244
                                        type=UPCALL_REPLY,
245
                                        server_reply=answer,
246
                                        orig_request=request,
247
                                        server_ip=ip,
248
                                        server_port=port,
249
                                        extra_args=args,
250
                                        client=self,
251
                                       )
252
      self._callback(client_reply)
253

    
254
    finally:
255
      self.ExpireRequests()
256

    
257

    
258
# UPCALL_REPLY: server reply upcall
259
# has all ConfdUpcallPayload fields populated
260
UPCALL_REPLY = 1
261
# UPCALL_EXPIRE: internal library request expire
262
# has only salt, type, orig_request and extra_args
263
UPCALL_EXPIRE = 2
264
CONFD_UPCALL_TYPES = frozenset([
265
  UPCALL_REPLY,
266
  UPCALL_EXPIRE,
267
  ])
268

    
269

    
270
class ConfdUpcallPayload(objects.ConfigObject):
271
  """Callback argument for confd replies
272

273
  @type salt: string
274
  @ivar salt: salt associated with the query
275
  @type type: one of confd.client.CONFD_UPCALL_TYPES
276
  @ivar type: upcall type (server reply, expired request, ...)
277
  @type orig_request: L{objects.ConfdRequest}
278
  @ivar orig_request: original request
279
  @type server_reply: L{objects.ConfdReply}
280
  @ivar server_reply: server reply
281
  @type server_ip: string
282
  @ivar server_ip: answering server ip address
283
  @type server_port: int
284
  @ivar server_port: answering server port
285
  @type extra_args: any
286
  @ivar extra_args: 'args' argument of the SendRequest function
287
  @type client: L{ConfdClient}
288
  @ivar client: current confd client instance
289

290
  """
291
  __slots__ = [
292
    "salt",
293
    "type",
294
    "orig_request",
295
    "server_reply",
296
    "server_ip",
297
    "server_port",
298
    "extra_args",
299
    "client",
300
    ]
301

    
302

    
303
class ConfdClientRequest(objects.ConfdRequest):
304
  """This is the client-side version of ConfdRequest.
305

306
  This version of the class helps creating requests, on the client side, by
307
  filling in some default values.
308

309
  """
310
  def __init__(self, **kwargs):
311
    objects.ConfdRequest.__init__(self, **kwargs)
312
    if not self.rsalt:
313
      self.rsalt = utils.NewUUID()
314
    if not self.protocol:
315
      self.protocol = constants.CONFD_PROTOCOL_VERSION
316
    if self.type not in constants.CONFD_REQS:
317
      raise errors.ConfdClientError("Invalid request type")
318

    
319

    
320
class ConfdFilterCallback:
321
  """Callback that calls another callback, but filters duplicate results.
322

323
  """
324
  def __init__(self, callback, logger=None):
325
    """Constructor for ConfdFilterCallback
326

327
    @type callback: f(L{ConfdUpcallPayload})
328
    @param callback: function to call when getting answers
329
    @type logger: logging.Logger
330
    @keyword logger: optional logger for internal conditions
331

332
    """
333
    if not callable(callback):
334
      raise errors.ProgrammerError("callback must be callable")
335

    
336
    self._callback = callback
337
    self._logger = logger
338
    # answers contains a dict of salt -> answer
339
    self._answers = {}
340

    
341
  def _LogFilter(self, salt, new_reply, old_reply):
342
    if not self._logger:
343
      return
344

    
345
    if new_reply.serial > old_reply.serial:
346
      self._logger.debug("Filtering confirming answer, with newer"
347
                         " serial for query %s" % salt)
348
    elif new_reply.serial == old_reply.serial:
349
      if new_reply.answer != old_reply.answer:
350
        self._logger.warning("Got incoherent answers for query %s"
351
                             " (serial: %s)" % (salt, new_reply.serial))
352
      else:
353
        self._logger.debug("Filtering confirming answer, with same"
354
                           " serial for query %s" % salt)
355
    else:
356
      self._logger.debug("Filtering outdated answer for query %s"
357
                         " serial: (%d < %d)" % (salt, old_reply.serial,
358
                                                 new_reply.serial))
359

    
360
  def _HandleExpire(self, up):
361
    # if we have no answer we have received none, before the expiration.
362
    if up.salt in self._answers:
363
      del self._answers[up.salt]
364

    
365
  def _HandleReply(self, up):
366
    """Handle a single confd reply, and decide whether to filter it.
367

368
    @rtype: boolean
369
    @return: True if the reply should be filtered, False if it should be passed
370
             on to the up-callback
371

372
    """
373
    filter_upcall = False
374
    salt = up.salt
375
    if salt not in self._answers:
376
      # first answer for a query (don't filter, and record)
377
      self._answers[salt] = up.server_reply
378
    elif up.server_reply.serial > self._answers[salt].serial:
379
      # newer answer (record, and compare contents)
380
      old_answer = self._answers[salt]
381
      self._answers[salt] = up.server_reply
382
      if up.server_reply.answer == old_answer.answer:
383
        # same content (filter) (version upgrade was unrelated)
384
        filter_upcall = True
385
        self._LogFilter(salt, up.server_reply, old_answer)
386
      # else: different content, pass up a second answer
387
    else:
388
      # older or same-version answer (duplicate or outdated, filter)
389
      filter_upcall = True
390
      self._LogFilter(salt, up.server_reply, self._answers[salt])
391

    
392
    return filter_upcall
393

    
394
  def __call__(self, up):
395
    """Filtering callback
396

397
    @type up: L{ConfdUpcallPayload}
398
    @param up: upper callback
399

400
    """
401
    filter_upcall = False
402
    if up.type == UPCALL_REPLY:
403
      filter_upcall = self._HandleReply(up)
404
    elif up.type == UPCALL_EXPIRE:
405
      self._HandleExpire(up)
406

    
407
    if not filter_upcall:
408
      self._callback(up)