Statistics
| Branch: | Tag: | Revision:

root / lib / confd / client.py @ 04cdf663

History | View | Annotate | Download (15.3 kB)

1
#
2
#
3

    
4
# Copyright (C) 2009 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Ganeti confd client
23

24
Clients can use the confd client library to send requests to a group of master
25
candidates running confd. The expected usage is through the asyncore framework,
26
by sending queries, and asynchronously receiving replies through a callback.
27

28
This way the client library doesn't ever need to "wait" on a particular answer,
29
and can proceed even if some udp packets are lost. It's up to the user to
30
reschedule queries if they haven't received responses and they need them.
31

32
Example usage::
33

34
  client = ConfdClient(...) # includes callback specification
35
  req = confd_client.ConfdClientRequest(type=constants.CONFD_REQ_PING)
36
  client.SendRequest(req)
37
  # then make sure your client calls asyncore.loop() or daemon.Mainloop.Run()
38
  # ... wait ...
39
  # And your callback will be called by asyncore, when your query gets a
40
  # response, or when it expires.
41

42
You can use the provided ConfdFilterCallback to act as a filter, only passing
43
"newer" answer to your callback, and filtering out outdated ones, or ones
44
confirming what you already got.
45

46
"""
47

    
48
# pylint: disable-msg=E0203
49

    
50
# E0203: Access to member %r before its definition, since we use
51
# objects.py which doesn't explicitely initialise its members
52

    
53
import time
54
import random
55

    
56
from ganeti import utils
57
from ganeti import constants
58
from ganeti import objects
59
from ganeti import serializer
60
from ganeti import daemon # contains AsyncUDPSocket
61
from ganeti import errors
62
from ganeti import confd
63

    
64

    
65
class ConfdAsyncUDPClient(daemon.AsyncUDPSocket):
66
  """Confd udp asyncore client
67

68
  This is kept separate from the main ConfdClient to make sure it's easy to
69
  implement a non-asyncore based client library.
70

71
  """
72
  def __init__(self, client):
73
    """Constructor for ConfdAsyncUDPClient
74

75
    @type client: L{ConfdClient}
76
    @param client: client library, to pass the datagrams to
77

78
    """
79
    daemon.AsyncUDPSocket.__init__(self)
80
    self.client = client
81

    
82
  # this method is overriding a daemon.AsyncUDPSocket method
83
  def handle_datagram(self, payload, ip, port):
84
    self.client.HandleResponse(payload, ip, port)
85

    
86

    
87
class ConfdClient:
88
  """Send queries to confd, and get back answers.
89

90
  Since the confd model works by querying multiple master candidates, and
91
  getting back answers, this is an asynchronous library. It can either work
92
  through asyncore or with your own handling.
93

94
  """
95
  def __init__(self, hmac_key, peers, callback, port=None, logger=None):
96
    """Constructor for ConfdClient
97

98
    @type hmac_key: string
99
    @param hmac_key: hmac key to talk to confd
100
    @type peers: list
101
    @param peers: list of peer nodes
102
    @type callback: f(L{ConfdUpcallPayload})
103
    @param callback: function to call when getting answers
104
    @type port: integer
105
    @param port: confd port (default: use GetDaemonPort)
106
    @type logger: logging.Logger
107
    @param logger: optional logger for internal conditions
108

109
    """
110
    if not callable(callback):
111
      raise errors.ProgrammerError("callback must be callable")
112

    
113
    self.UpdatePeerList(peers)
114
    self._hmac_key = hmac_key
115
    self._socket = ConfdAsyncUDPClient(self)
116
    self._callback = callback
117
    self._confd_port = port
118
    self._logger = logger
119
    self._requests = {}
120
    self._expire_requests = []
121

    
122
    if self._confd_port is None:
123
      self._confd_port = utils.GetDaemonPort(constants.CONFD)
124

    
125
  def UpdatePeerList(self, peers):
126
    """Update the list of peers
127

128
    @type peers: list
129
    @param peers: list of peer nodes
130

131
    """
132
    # we are actually called from init, so:
133
    # pylint: disable-msg=W0201
134
    if not isinstance(peers, list):
135
      raise errors.ProgrammerError("peers must be a list")
136
    # make a copy of peers, since we're going to shuffle the list, later
137
    self._peers = list(peers)
138

    
139
  def _PackRequest(self, request, now=None):
140
    """Prepare a request to be sent on the wire.
141

142
    This function puts a proper salt in a confd request, puts the proper salt,
143
    and adds the correct magic number.
144

145
    """
146
    if now is None:
147
      now = time.time()
148
    tstamp = '%d' % now
149
    req = serializer.DumpSignedJson(request.ToDict(), self._hmac_key, tstamp)
150
    return confd.PackMagic(req)
151

    
152
  def _UnpackReply(self, payload):
153
    in_payload = confd.UnpackMagic(payload)
154
    (dict_answer, salt) = serializer.LoadSignedJson(in_payload, self._hmac_key)
155
    answer = objects.ConfdReply.FromDict(dict_answer)
156
    return answer, salt
157

    
158
  def ExpireRequests(self):
159
    """Delete all the expired requests.
160

161
    """
162
    now = time.time()
163
    while self._expire_requests:
164
      expire_time, rsalt = self._expire_requests[0]
165
      if now >= expire_time:
166
        self._expire_requests.pop(0)
167
        (request, args) = self._requests[rsalt]
168
        del self._requests[rsalt]
169
        client_reply = ConfdUpcallPayload(salt=rsalt,
170
                                          type=UPCALL_EXPIRE,
171
                                          orig_request=request,
172
                                          extra_args=args,
173
                                          client=self,
174
                                          )
175
        self._callback(client_reply)
176
      else:
177
        break
178

    
179
  def SendRequest(self, request, args=None, coverage=None, async=True):
180
    """Send a confd request to some MCs
181

182
    @type request: L{objects.ConfdRequest}
183
    @param request: the request to send
184
    @type args: tuple
185
    @param args: additional callback arguments
186
    @type coverage: integer
187
    @param coverage: number of remote nodes to contact
188
    @type async: boolean
189
    @param async: handle the write asynchronously
190

191
    """
192
    if coverage is None:
193
      coverage = min(len(self._peers), constants.CONFD_DEFAULT_REQ_COVERAGE)
194

    
195
    if coverage > len(self._peers):
196
      raise errors.ConfdClientError("Not enough MCs known to provide the"
197
                                    " desired coverage")
198

    
199
    if not request.rsalt:
200
      raise errors.ConfdClientError("Missing request rsalt")
201

    
202
    self.ExpireRequests()
203
    if request.rsalt in self._requests:
204
      raise errors.ConfdClientError("Duplicate request rsalt")
205

    
206
    if request.type not in constants.CONFD_REQS:
207
      raise errors.ConfdClientError("Invalid request type")
208

    
209
    random.shuffle(self._peers)
210
    targets = self._peers[:coverage]
211

    
212
    now = time.time()
213
    payload = self._PackRequest(request, now=now)
214

    
215
    for target in targets:
216
      try:
217
        self._socket.enqueue_send(target, self._confd_port, payload)
218
      except errors.UdpDataSizeError:
219
        raise errors.ConfdClientError("Request too big")
220

    
221
    self._requests[request.rsalt] = (request, args)
222
    expire_time = now + constants.CONFD_CLIENT_EXPIRE_TIMEOUT
223
    self._expire_requests.append((expire_time, request.rsalt))
224

    
225
    if not async:
226
      self.FlushSendQueue()
227

    
228
  def HandleResponse(self, payload, ip, port):
229
    """Asynchronous handler for a confd reply
230

231
    Call the relevant callback associated to the current request.
232

233
    """
234
    try:
235
      try:
236
        answer, salt = self._UnpackReply(payload)
237
      except (errors.SignatureError, errors.ConfdMagicError), err:
238
        if self._logger:
239
          self._logger.debug("Discarding broken package: %s" % err)
240
        return
241

    
242
      try:
243
        (request, args) = self._requests[salt]
244
      except KeyError:
245
        if self._logger:
246
          self._logger.debug("Discarding unknown (expired?) reply: %s" % err)
247
        return
248

    
249
      client_reply = ConfdUpcallPayload(salt=salt,
250
                                        type=UPCALL_REPLY,
251
                                        server_reply=answer,
252
                                        orig_request=request,
253
                                        server_ip=ip,
254
                                        server_port=port,
255
                                        extra_args=args,
256
                                        client=self,
257
                                       )
258
      self._callback(client_reply)
259

    
260
    finally:
261
      self.ExpireRequests()
262

    
263
  def FlushSendQueue(self):
264
    """Send out all pending requests.
265

266
    Can be used for synchronous client use.
267

268
    """
269
    while self._socket.writable():
270
      self._socket.handle_write()
271

    
272
  def ReceiveReply(self, timeout=1):
273
    """Receive one reply.
274

275
    @type timeout: float
276
    @param timeout: how long to wait for the reply
277
    @rtype: boolean
278
    @return: True if some data has been handled, False otherwise
279

280
    """
281
    return self._socket.process_next_packet(timeout=timeout)
282

    
283

    
284
# UPCALL_REPLY: server reply upcall
285
# has all ConfdUpcallPayload fields populated
286
UPCALL_REPLY = 1
287
# UPCALL_EXPIRE: internal library request expire
288
# has only salt, type, orig_request and extra_args
289
UPCALL_EXPIRE = 2
290
CONFD_UPCALL_TYPES = frozenset([
291
  UPCALL_REPLY,
292
  UPCALL_EXPIRE,
293
  ])
294

    
295

    
296
class ConfdUpcallPayload(objects.ConfigObject):
297
  """Callback argument for confd replies
298

299
  @type salt: string
300
  @ivar salt: salt associated with the query
301
  @type type: one of confd.client.CONFD_UPCALL_TYPES
302
  @ivar type: upcall type (server reply, expired request, ...)
303
  @type orig_request: L{objects.ConfdRequest}
304
  @ivar orig_request: original request
305
  @type server_reply: L{objects.ConfdReply}
306
  @ivar server_reply: server reply
307
  @type server_ip: string
308
  @ivar server_ip: answering server ip address
309
  @type server_port: int
310
  @ivar server_port: answering server port
311
  @type extra_args: any
312
  @ivar extra_args: 'args' argument of the SendRequest function
313
  @type client: L{ConfdClient}
314
  @ivar client: current confd client instance
315

316
  """
317
  __slots__ = [
318
    "salt",
319
    "type",
320
    "orig_request",
321
    "server_reply",
322
    "server_ip",
323
    "server_port",
324
    "extra_args",
325
    "client",
326
    ]
327

    
328

    
329
class ConfdClientRequest(objects.ConfdRequest):
330
  """This is the client-side version of ConfdRequest.
331

332
  This version of the class helps creating requests, on the client side, by
333
  filling in some default values.
334

335
  """
336
  def __init__(self, **kwargs):
337
    objects.ConfdRequest.__init__(self, **kwargs)
338
    if not self.rsalt:
339
      self.rsalt = utils.NewUUID()
340
    if not self.protocol:
341
      self.protocol = constants.CONFD_PROTOCOL_VERSION
342
    if self.type not in constants.CONFD_REQS:
343
      raise errors.ConfdClientError("Invalid request type")
344

    
345

    
346
class ConfdFilterCallback:
347
  """Callback that calls another callback, but filters duplicate results.
348

349
  """
350
  def __init__(self, callback, logger=None):
351
    """Constructor for ConfdFilterCallback
352

353
    @type callback: f(L{ConfdUpcallPayload})
354
    @param callback: function to call when getting answers
355
    @type logger: logging.Logger
356
    @param logger: optional logger for internal conditions
357

358
    """
359
    if not callable(callback):
360
      raise errors.ProgrammerError("callback must be callable")
361

    
362
    self._callback = callback
363
    self._logger = logger
364
    # answers contains a dict of salt -> answer
365
    self._answers = {}
366

    
367
  def _LogFilter(self, salt, new_reply, old_reply):
368
    if not self._logger:
369
      return
370

    
371
    if new_reply.serial > old_reply.serial:
372
      self._logger.debug("Filtering confirming answer, with newer"
373
                         " serial for query %s" % salt)
374
    elif new_reply.serial == old_reply.serial:
375
      if new_reply.answer != old_reply.answer:
376
        self._logger.warning("Got incoherent answers for query %s"
377
                             " (serial: %s)" % (salt, new_reply.serial))
378
      else:
379
        self._logger.debug("Filtering confirming answer, with same"
380
                           " serial for query %s" % salt)
381
    else:
382
      self._logger.debug("Filtering outdated answer for query %s"
383
                         " serial: (%d < %d)" % (salt, old_reply.serial,
384
                                                 new_reply.serial))
385

    
386
  def _HandleExpire(self, up):
387
    # if we have no answer we have received none, before the expiration.
388
    if up.salt in self._answers:
389
      del self._answers[up.salt]
390

    
391
  def _HandleReply(self, up):
392
    """Handle a single confd reply, and decide whether to filter it.
393

394
    @rtype: boolean
395
    @return: True if the reply should be filtered, False if it should be passed
396
             on to the up-callback
397

398
    """
399
    filter_upcall = False
400
    salt = up.salt
401
    if salt not in self._answers:
402
      # first answer for a query (don't filter, and record)
403
      self._answers[salt] = up.server_reply
404
    elif up.server_reply.serial > self._answers[salt].serial:
405
      # newer answer (record, and compare contents)
406
      old_answer = self._answers[salt]
407
      self._answers[salt] = up.server_reply
408
      if up.server_reply.answer == old_answer.answer:
409
        # same content (filter) (version upgrade was unrelated)
410
        filter_upcall = True
411
        self._LogFilter(salt, up.server_reply, old_answer)
412
      # else: different content, pass up a second answer
413
    else:
414
      # older or same-version answer (duplicate or outdated, filter)
415
      filter_upcall = True
416
      self._LogFilter(salt, up.server_reply, self._answers[salt])
417

    
418
    return filter_upcall
419

    
420
  def __call__(self, up):
421
    """Filtering callback
422

423
    @type up: L{ConfdUpcallPayload}
424
    @param up: upper callback
425

426
    """
427
    filter_upcall = False
428
    if up.type == UPCALL_REPLY:
429
      filter_upcall = self._HandleReply(up)
430
    elif up.type == UPCALL_EXPIRE:
431
      self._HandleExpire(up)
432

    
433
    if not filter_upcall:
434
      self._callback(up)
435

    
436

    
437
class ConfdCountingCallback:
438
  """Callback that calls another callback, and counts the answers
439

440
  """
441
  def __init__(self, callback, logger=None):
442
    """Constructor for ConfdCountingCallback
443

444
    @type callback: f(L{ConfdUpcallPayload})
445
    @param callback: function to call when getting answers
446
    @type logger: logging.Logger
447
    @param logger: optional logger for internal conditions
448

449
    """
450
    if not callable(callback):
451
      raise errors.ProgrammerError("callback must be callable")
452

    
453
    self._callback = callback
454
    self._logger = logger
455
    # answers contains a dict of salt -> count
456
    self._answers = {}
457

    
458
  def RegisterQuery(self, salt):
459
    if salt in self._answers:
460
      raise errors.ProgrammerError("query already registered")
461
    self._answers[salt] = 0
462

    
463
  def AllAnswered(self):
464
    """Have all the registered queries received at least an answer?
465

466
    """
467
    return utils.all(self._answers.values())
468

    
469
  def _HandleExpire(self, up):
470
    # if we have no answer we have received none, before the expiration.
471
    if up.salt in self._answers:
472
      del self._answers[up.salt]
473

    
474
  def _HandleReply(self, up):
475
    """Handle a single confd reply, and decide whether to filter it.
476

477
    @rtype: boolean
478
    @return: True if the reply should be filtered, False if it should be passed
479
             on to the up-callback
480

481
    """
482
    if up.salt in self._answers:
483
      self._answers[up.salt] += 1
484

    
485
  def __call__(self, up):
486
    """Filtering callback
487

488
    @type up: L{ConfdUpcallPayload}
489
    @param up: upper callback
490

491
    """
492
    if up.type == UPCALL_REPLY:
493
      self._HandleReply(up)
494
    elif up.type == UPCALL_EXPIRE:
495
      self._HandleExpire(up)
496
    self._callback(up)