Statistics
| Branch: | Tag: | Revision:

root / lib / confd / client.py @ 39292d3a

History | View | Annotate | Download (16.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2009 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Ganeti confd client
23

24
Clients can use the confd client library to send requests to a group of master
25
candidates running confd. The expected usage is through the asyncore framework,
26
by sending queries, and asynchronously receiving replies through a callback.
27

28
This way the client library doesn't ever need to "wait" on a particular answer,
29
and can proceed even if some udp packets are lost. It's up to the user to
30
reschedule queries if they haven't received responses and they need them.
31

32
Example usage::
33

34
  client = ConfdClient(...) # includes callback specification
35
  req = confd_client.ConfdClientRequest(type=constants.CONFD_REQ_PING)
36
  client.SendRequest(req)
37
  # then make sure your client calls asyncore.loop() or daemon.Mainloop.Run()
38
  # ... wait ...
39
  # And your callback will be called by asyncore, when your query gets a
40
  # response, or when it expires.
41

42
You can use the provided ConfdFilterCallback to act as a filter, only passing
43
"newer" answer to your callback, and filtering out outdated ones, or ones
44
confirming what you already got.
45

46
"""
47

    
48
# pylint: disable-msg=E0203
49

    
50
# E0203: Access to member %r before its definition, since we use
51
# objects.py which doesn't explicitely initialise its members
52

    
53
import time
54
import random
55

    
56
from ganeti import utils
57
from ganeti import constants
58
from ganeti import objects
59
from ganeti import serializer
60
from ganeti import daemon # contains AsyncUDPSocket
61
from ganeti import errors
62
from ganeti import confd
63
from ganeti import ssconf
64

    
65

    
66
class ConfdAsyncUDPClient(daemon.AsyncUDPSocket):
67
  """Confd udp asyncore client
68

69
  This is kept separate from the main ConfdClient to make sure it's easy to
70
  implement a non-asyncore based client library.
71

72
  """
73
  def __init__(self, client):
74
    """Constructor for ConfdAsyncUDPClient
75

76
    @type client: L{ConfdClient}
77
    @param client: client library, to pass the datagrams to
78

79
    """
80
    daemon.AsyncUDPSocket.__init__(self)
81
    self.client = client
82

    
83
  # this method is overriding a daemon.AsyncUDPSocket method
84
  def handle_datagram(self, payload, ip, port):
85
    self.client.HandleResponse(payload, ip, port)
86

    
87

    
88
class ConfdClient:
89
  """Send queries to confd, and get back answers.
90

91
  Since the confd model works by querying multiple master candidates, and
92
  getting back answers, this is an asynchronous library. It can either work
93
  through asyncore or with your own handling.
94

95
  """
96
  def __init__(self, hmac_key, peers, callback, port=None, logger=None):
97
    """Constructor for ConfdClient
98

99
    @type hmac_key: string
100
    @param hmac_key: hmac key to talk to confd
101
    @type peers: list
102
    @param peers: list of peer nodes
103
    @type callback: f(L{ConfdUpcallPayload})
104
    @param callback: function to call when getting answers
105
    @type port: integer
106
    @param port: confd port (default: use GetDaemonPort)
107
    @type logger: logging.Logger
108
    @param logger: optional logger for internal conditions
109

110
    """
111
    if not callable(callback):
112
      raise errors.ProgrammerError("callback must be callable")
113

    
114
    self.UpdatePeerList(peers)
115
    self._hmac_key = hmac_key
116
    self._socket = ConfdAsyncUDPClient(self)
117
    self._callback = callback
118
    self._confd_port = port
119
    self._logger = logger
120
    self._requests = {}
121
    self._expire_requests = []
122

    
123
    if self._confd_port is None:
124
      self._confd_port = utils.GetDaemonPort(constants.CONFD)
125

    
126
  def UpdatePeerList(self, peers):
127
    """Update the list of peers
128

129
    @type peers: list
130
    @param peers: list of peer nodes
131

132
    """
133
    # we are actually called from init, so:
134
    # pylint: disable-msg=W0201
135
    if not isinstance(peers, list):
136
      raise errors.ProgrammerError("peers must be a list")
137
    # make a copy of peers, since we're going to shuffle the list, later
138
    self._peers = list(peers)
139

    
140
  def _PackRequest(self, request, now=None):
141
    """Prepare a request to be sent on the wire.
142

143
    This function puts a proper salt in a confd request, puts the proper salt,
144
    and adds the correct magic number.
145

146
    """
147
    if now is None:
148
      now = time.time()
149
    tstamp = '%d' % now
150
    req = serializer.DumpSignedJson(request.ToDict(), self._hmac_key, tstamp)
151
    return confd.PackMagic(req)
152

    
153
  def _UnpackReply(self, payload):
154
    in_payload = confd.UnpackMagic(payload)
155
    (dict_answer, salt) = serializer.LoadSignedJson(in_payload, self._hmac_key)
156
    answer = objects.ConfdReply.FromDict(dict_answer)
157
    return answer, salt
158

    
159
  def ExpireRequests(self):
160
    """Delete all the expired requests.
161

162
    """
163
    now = time.time()
164
    while self._expire_requests:
165
      expire_time, rsalt = self._expire_requests[0]
166
      if now >= expire_time:
167
        self._expire_requests.pop(0)
168
        (request, args) = self._requests[rsalt]
169
        del self._requests[rsalt]
170
        client_reply = ConfdUpcallPayload(salt=rsalt,
171
                                          type=UPCALL_EXPIRE,
172
                                          orig_request=request,
173
                                          extra_args=args,
174
                                          client=self,
175
                                          )
176
        self._callback(client_reply)
177
      else:
178
        break
179

    
180
  def SendRequest(self, request, args=None, coverage=None, async=True):
181
    """Send a confd request to some MCs
182

183
    @type request: L{objects.ConfdRequest}
184
    @param request: the request to send
185
    @type args: tuple
186
    @param args: additional callback arguments
187
    @type coverage: integer
188
    @param coverage: number of remote nodes to contact
189
    @type async: boolean
190
    @param async: handle the write asynchronously
191

192
    """
193
    if coverage is None:
194
      coverage = min(len(self._peers), constants.CONFD_DEFAULT_REQ_COVERAGE)
195

    
196
    if coverage > len(self._peers):
197
      raise errors.ConfdClientError("Not enough MCs known to provide the"
198
                                    " desired coverage")
199

    
200
    if not request.rsalt:
201
      raise errors.ConfdClientError("Missing request rsalt")
202

    
203
    self.ExpireRequests()
204
    if request.rsalt in self._requests:
205
      raise errors.ConfdClientError("Duplicate request rsalt")
206

    
207
    if request.type not in constants.CONFD_REQS:
208
      raise errors.ConfdClientError("Invalid request type")
209

    
210
    random.shuffle(self._peers)
211
    targets = self._peers[:coverage]
212

    
213
    now = time.time()
214
    payload = self._PackRequest(request, now=now)
215

    
216
    for target in targets:
217
      try:
218
        self._socket.enqueue_send(target, self._confd_port, payload)
219
      except errors.UdpDataSizeError:
220
        raise errors.ConfdClientError("Request too big")
221

    
222
    self._requests[request.rsalt] = (request, args)
223
    expire_time = now + constants.CONFD_CLIENT_EXPIRE_TIMEOUT
224
    self._expire_requests.append((expire_time, request.rsalt))
225

    
226
    if not async:
227
      self.FlushSendQueue()
228

    
229
  def HandleResponse(self, payload, ip, port):
230
    """Asynchronous handler for a confd reply
231

232
    Call the relevant callback associated to the current request.
233

234
    """
235
    try:
236
      try:
237
        answer, salt = self._UnpackReply(payload)
238
      except (errors.SignatureError, errors.ConfdMagicError), err:
239
        if self._logger:
240
          self._logger.debug("Discarding broken package: %s" % err)
241
        return
242

    
243
      try:
244
        (request, args) = self._requests[salt]
245
      except KeyError:
246
        if self._logger:
247
          self._logger.debug("Discarding unknown (expired?) reply: %s" % err)
248
        return
249

    
250
      client_reply = ConfdUpcallPayload(salt=salt,
251
                                        type=UPCALL_REPLY,
252
                                        server_reply=answer,
253
                                        orig_request=request,
254
                                        server_ip=ip,
255
                                        server_port=port,
256
                                        extra_args=args,
257
                                        client=self,
258
                                       )
259
      self._callback(client_reply)
260

    
261
    finally:
262
      self.ExpireRequests()
263

    
264
  def FlushSendQueue(self):
265
    """Send out all pending requests.
266

267
    Can be used for synchronous client use.
268

269
    """
270
    while self._socket.writable():
271
      self._socket.handle_write()
272

    
273
  def ReceiveReply(self, timeout=1):
274
    """Receive one reply.
275

276
    @type timeout: float
277
    @param timeout: how long to wait for the reply
278
    @rtype: boolean
279
    @return: True if some data has been handled, False otherwise
280

281
    """
282
    return self._socket.process_next_packet(timeout=timeout)
283

    
284

    
285
# UPCALL_REPLY: server reply upcall
286
# has all ConfdUpcallPayload fields populated
287
UPCALL_REPLY = 1
288
# UPCALL_EXPIRE: internal library request expire
289
# has only salt, type, orig_request and extra_args
290
UPCALL_EXPIRE = 2
291
CONFD_UPCALL_TYPES = frozenset([
292
  UPCALL_REPLY,
293
  UPCALL_EXPIRE,
294
  ])
295

    
296

    
297
class ConfdUpcallPayload(objects.ConfigObject):
298
  """Callback argument for confd replies
299

300
  @type salt: string
301
  @ivar salt: salt associated with the query
302
  @type type: one of confd.client.CONFD_UPCALL_TYPES
303
  @ivar type: upcall type (server reply, expired request, ...)
304
  @type orig_request: L{objects.ConfdRequest}
305
  @ivar orig_request: original request
306
  @type server_reply: L{objects.ConfdReply}
307
  @ivar server_reply: server reply
308
  @type server_ip: string
309
  @ivar server_ip: answering server ip address
310
  @type server_port: int
311
  @ivar server_port: answering server port
312
  @type extra_args: any
313
  @ivar extra_args: 'args' argument of the SendRequest function
314
  @type client: L{ConfdClient}
315
  @ivar client: current confd client instance
316

317
  """
318
  __slots__ = [
319
    "salt",
320
    "type",
321
    "orig_request",
322
    "server_reply",
323
    "server_ip",
324
    "server_port",
325
    "extra_args",
326
    "client",
327
    ]
328

    
329

    
330
class ConfdClientRequest(objects.ConfdRequest):
331
  """This is the client-side version of ConfdRequest.
332

333
  This version of the class helps creating requests, on the client side, by
334
  filling in some default values.
335

336
  """
337
  def __init__(self, **kwargs):
338
    objects.ConfdRequest.__init__(self, **kwargs)
339
    if not self.rsalt:
340
      self.rsalt = utils.NewUUID()
341
    if not self.protocol:
342
      self.protocol = constants.CONFD_PROTOCOL_VERSION
343
    if self.type not in constants.CONFD_REQS:
344
      raise errors.ConfdClientError("Invalid request type")
345

    
346

    
347
class ConfdFilterCallback:
348
  """Callback that calls another callback, but filters duplicate results.
349

350
  @ivar consistent: a dictionary indexed by salt; for each salt, if
351
      all responses ware identical, this will be True; this is the
352
      expected state on a healthy cluster; on inconsistent or
353
      partitioned clusters, this might be False, if we see answers
354
      with the same serial but different contents
355

356
  """
357
  def __init__(self, callback, logger=None):
358
    """Constructor for ConfdFilterCallback
359

360
    @type callback: f(L{ConfdUpcallPayload})
361
    @param callback: function to call when getting answers
362
    @type logger: logging.Logger
363
    @param logger: optional logger for internal conditions
364

365
    """
366
    if not callable(callback):
367
      raise errors.ProgrammerError("callback must be callable")
368

    
369
    self._callback = callback
370
    self._logger = logger
371
    # answers contains a dict of salt -> answer
372
    self._answers = {}
373
    self.consistent = {}
374

    
375
  def _LogFilter(self, salt, new_reply, old_reply):
376
    if not self._logger:
377
      return
378

    
379
    if new_reply.serial > old_reply.serial:
380
      self._logger.debug("Filtering confirming answer, with newer"
381
                         " serial for query %s" % salt)
382
    elif new_reply.serial == old_reply.serial:
383
      if new_reply.answer != old_reply.answer:
384
        self._logger.warning("Got incoherent answers for query %s"
385
                             " (serial: %s)" % (salt, new_reply.serial))
386
      else:
387
        self._logger.debug("Filtering confirming answer, with same"
388
                           " serial for query %s" % salt)
389
    else:
390
      self._logger.debug("Filtering outdated answer for query %s"
391
                         " serial: (%d < %d)" % (salt, old_reply.serial,
392
                                                 new_reply.serial))
393

    
394
  def _HandleExpire(self, up):
395
    # if we have no answer we have received none, before the expiration.
396
    if up.salt in self._answers:
397
      del self._answers[up.salt]
398
    if up.salt in self.consistent:
399
      del self.consistent[up.salt]
400

    
401
  def _HandleReply(self, up):
402
    """Handle a single confd reply, and decide whether to filter it.
403

404
    @rtype: boolean
405
    @return: True if the reply should be filtered, False if it should be passed
406
             on to the up-callback
407

408
    """
409
    filter_upcall = False
410
    salt = up.salt
411
    if salt not in self.consistent:
412
      self.consistent[salt] = True
413
    if salt not in self._answers:
414
      # first answer for a query (don't filter, and record)
415
      self._answers[salt] = up.server_reply
416
    elif up.server_reply.serial > self._answers[salt].serial:
417
      # newer answer (record, and compare contents)
418
      old_answer = self._answers[salt]
419
      self._answers[salt] = up.server_reply
420
      if up.server_reply.answer == old_answer.answer:
421
        # same content (filter) (version upgrade was unrelated)
422
        filter_upcall = True
423
        self._LogFilter(salt, up.server_reply, old_answer)
424
      # else: different content, pass up a second answer
425
    else:
426
      # older or same-version answer (duplicate or outdated, filter)
427
      if (up.server_reply.serial == self._answers[salt].serial and
428
          up.server_reply.answer != self._answers[salt].answer):
429
        self.consistent[salt] = False
430
      filter_upcall = True
431
      self._LogFilter(salt, up.server_reply, self._answers[salt])
432

    
433
    return filter_upcall
434

    
435
  def __call__(self, up):
436
    """Filtering callback
437

438
    @type up: L{ConfdUpcallPayload}
439
    @param up: upper callback
440

441
    """
442
    filter_upcall = False
443
    if up.type == UPCALL_REPLY:
444
      filter_upcall = self._HandleReply(up)
445
    elif up.type == UPCALL_EXPIRE:
446
      self._HandleExpire(up)
447

    
448
    if not filter_upcall:
449
      self._callback(up)
450

    
451

    
452
class ConfdCountingCallback:
453
  """Callback that calls another callback, and counts the answers
454

455
  """
456
  def __init__(self, callback, logger=None):
457
    """Constructor for ConfdCountingCallback
458

459
    @type callback: f(L{ConfdUpcallPayload})
460
    @param callback: function to call when getting answers
461
    @type logger: logging.Logger
462
    @param logger: optional logger for internal conditions
463

464
    """
465
    if not callable(callback):
466
      raise errors.ProgrammerError("callback must be callable")
467

    
468
    self._callback = callback
469
    self._logger = logger
470
    # answers contains a dict of salt -> count
471
    self._answers = {}
472

    
473
  def RegisterQuery(self, salt):
474
    if salt in self._answers:
475
      raise errors.ProgrammerError("query already registered")
476
    self._answers[salt] = 0
477

    
478
  def AllAnswered(self):
479
    """Have all the registered queries received at least an answer?
480

481
    """
482
    return utils.all(self._answers.values())
483

    
484
  def _HandleExpire(self, up):
485
    # if we have no answer we have received none, before the expiration.
486
    if up.salt in self._answers:
487
      del self._answers[up.salt]
488

    
489
  def _HandleReply(self, up):
490
    """Handle a single confd reply, and decide whether to filter it.
491

492
    @rtype: boolean
493
    @return: True if the reply should be filtered, False if it should be passed
494
             on to the up-callback
495

496
    """
497
    if up.salt in self._answers:
498
      self._answers[up.salt] += 1
499

    
500
  def __call__(self, up):
501
    """Filtering callback
502

503
    @type up: L{ConfdUpcallPayload}
504
    @param up: upper callback
505

506
    """
507
    if up.type == UPCALL_REPLY:
508
      self._HandleReply(up)
509
    elif up.type == UPCALL_EXPIRE:
510
      self._HandleExpire(up)
511
    self._callback(up)
512

    
513
def GetConfdClient(callback):
514
  """Return a client configured using the given callback.
515

516
  This is handy to abstract the MC list and HMAC key reading.
517

518
  @attention: This should only be called on nodes which are part of a
519
      cluster, since it depends on a valid (ganeti) data directory;
520
      for code running outside of a cluster, you need to create the
521
      client manually
522

523
  """
524
  ss = ssconf.SimpleStore()
525
  mc_file = ss.KeyToFilename(constants.SS_MASTER_CANDIDATES_IPS)
526
  mc_list = utils.ReadFile(mc_file).splitlines()
527
  hmac_key = utils.ReadFile(constants.CONFD_HMAC_KEY)
528
  return ConfdClient(hmac_key, mc_list, callback)