Statistics
| Branch: | Tag: | Revision:

root / lib / confd / client.py @ 71e114da

History | View | Annotate | Download (16.8 kB)

1
#
2
#
3

    
4
# Copyright (C) 2009 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Ganeti confd client
23

24
Clients can use the confd client library to send requests to a group of master
25
candidates running confd. The expected usage is through the asyncore framework,
26
by sending queries, and asynchronously receiving replies through a callback.
27

28
This way the client library doesn't ever need to "wait" on a particular answer,
29
and can proceed even if some udp packets are lost. It's up to the user to
30
reschedule queries if they haven't received responses and they need them.
31

32
Example usage::
33

34
  client = ConfdClient(...) # includes callback specification
35
  req = confd_client.ConfdClientRequest(type=constants.CONFD_REQ_PING)
36
  client.SendRequest(req)
37
  # then make sure your client calls asyncore.loop() or daemon.Mainloop.Run()
38
  # ... wait ...
39
  # And your callback will be called by asyncore, when your query gets a
40
  # response, or when it expires.
41

42
You can use the provided ConfdFilterCallback to act as a filter, only passing
43
"newer" answer to your callback, and filtering out outdated ones, or ones
44
confirming what you already got.
45

46
"""
47

    
48
# pylint: disable-msg=E0203
49

    
50
# E0203: Access to member %r before its definition, since we use
51
# objects.py which doesn't explicitely initialise its members
52

    
53
import time
54
import random
55

    
56
from ganeti import utils
57
from ganeti import constants
58
from ganeti import objects
59
from ganeti import serializer
60
from ganeti import daemon # contains AsyncUDPSocket
61
from ganeti import errors
62
from ganeti import confd
63
from ganeti import ssconf
64

    
65

    
66
class ConfdAsyncUDPClient(daemon.AsyncUDPSocket):
67
  """Confd udp asyncore client
68

69
  This is kept separate from the main ConfdClient to make sure it's easy to
70
  implement a non-asyncore based client library.
71

72
  """
73
  def __init__(self, client):
74
    """Constructor for ConfdAsyncUDPClient
75

76
    @type client: L{ConfdClient}
77
    @param client: client library, to pass the datagrams to
78

79
    """
80
    daemon.AsyncUDPSocket.__init__(self)
81
    self.client = client
82

    
83
  # this method is overriding a daemon.AsyncUDPSocket method
84
  def handle_datagram(self, payload, ip, port):
85
    self.client.HandleResponse(payload, ip, port)
86

    
87

    
88
class _Request(object):
89
  """Request status structure.
90

91
  @ivar request: the request data
92
  @ivar args: any extra arguments for the callback
93
  @ivar expiry: the expiry timestamp of the request
94

95
  """
96
  def __init__(self, request, args, expiry):
97
    self.request = request
98
    self.args = args
99
    self.expiry = expiry
100

    
101

    
102
class ConfdClient:
103
  """Send queries to confd, and get back answers.
104

105
  Since the confd model works by querying multiple master candidates, and
106
  getting back answers, this is an asynchronous library. It can either work
107
  through asyncore or with your own handling.
108

109
  @type _requests: dict
110
  @ivar _requests: dictionary indexes by salt, which contains data
111
      about the outstanding requests; the values are objects of type
112
      L{_Request}
113

114
  """
115
  def __init__(self, hmac_key, peers, callback, port=None, logger=None):
116
    """Constructor for ConfdClient
117

118
    @type hmac_key: string
119
    @param hmac_key: hmac key to talk to confd
120
    @type peers: list
121
    @param peers: list of peer nodes
122
    @type callback: f(L{ConfdUpcallPayload})
123
    @param callback: function to call when getting answers
124
    @type port: integer
125
    @param port: confd port (default: use GetDaemonPort)
126
    @type logger: logging.Logger
127
    @param logger: optional logger for internal conditions
128

129
    """
130
    if not callable(callback):
131
      raise errors.ProgrammerError("callback must be callable")
132

    
133
    self.UpdatePeerList(peers)
134
    self._hmac_key = hmac_key
135
    self._socket = ConfdAsyncUDPClient(self)
136
    self._callback = callback
137
    self._confd_port = port
138
    self._logger = logger
139
    self._requests = {}
140

    
141
    if self._confd_port is None:
142
      self._confd_port = utils.GetDaemonPort(constants.CONFD)
143

    
144
  def UpdatePeerList(self, peers):
145
    """Update the list of peers
146

147
    @type peers: list
148
    @param peers: list of peer nodes
149

150
    """
151
    # we are actually called from init, so:
152
    # pylint: disable-msg=W0201
153
    if not isinstance(peers, list):
154
      raise errors.ProgrammerError("peers must be a list")
155
    # make a copy of peers, since we're going to shuffle the list, later
156
    self._peers = list(peers)
157

    
158
  def _PackRequest(self, request, now=None):
159
    """Prepare a request to be sent on the wire.
160

161
    This function puts a proper salt in a confd request, puts the proper salt,
162
    and adds the correct magic number.
163

164
    """
165
    if now is None:
166
      now = time.time()
167
    tstamp = '%d' % now
168
    req = serializer.DumpSignedJson(request.ToDict(), self._hmac_key, tstamp)
169
    return confd.PackMagic(req)
170

    
171
  def _UnpackReply(self, payload):
172
    in_payload = confd.UnpackMagic(payload)
173
    (dict_answer, salt) = serializer.LoadSignedJson(in_payload, self._hmac_key)
174
    answer = objects.ConfdReply.FromDict(dict_answer)
175
    return answer, salt
176

    
177
  def ExpireRequests(self):
178
    """Delete all the expired requests.
179

180
    """
181
    now = time.time()
182
    for rsalt, rq in self._requests.items():
183
      if now >= rq.expiry:
184
        del self._requests[rsalt]
185
        client_reply = ConfdUpcallPayload(salt=rsalt,
186
                                          type=UPCALL_EXPIRE,
187
                                          orig_request=rq.request,
188
                                          extra_args=rq.args,
189
                                          client=self,
190
                                          )
191
        self._callback(client_reply)
192

    
193
  def SendRequest(self, request, args=None, coverage=None, async=True):
194
    """Send a confd request to some MCs
195

196
    @type request: L{objects.ConfdRequest}
197
    @param request: the request to send
198
    @type args: tuple
199
    @param args: additional callback arguments
200
    @type coverage: integer
201
    @param coverage: number of remote nodes to contact
202
    @type async: boolean
203
    @param async: handle the write asynchronously
204

205
    """
206
    if coverage is None:
207
      coverage = min(len(self._peers), constants.CONFD_DEFAULT_REQ_COVERAGE)
208

    
209
    if coverage > len(self._peers):
210
      raise errors.ConfdClientError("Not enough MCs known to provide the"
211
                                    " desired coverage")
212

    
213
    if not request.rsalt:
214
      raise errors.ConfdClientError("Missing request rsalt")
215

    
216
    self.ExpireRequests()
217
    if request.rsalt in self._requests:
218
      raise errors.ConfdClientError("Duplicate request rsalt")
219

    
220
    if request.type not in constants.CONFD_REQS:
221
      raise errors.ConfdClientError("Invalid request type")
222

    
223
    random.shuffle(self._peers)
224
    targets = self._peers[:coverage]
225

    
226
    now = time.time()
227
    payload = self._PackRequest(request, now=now)
228

    
229
    for target in targets:
230
      try:
231
        self._socket.enqueue_send(target, self._confd_port, payload)
232
      except errors.UdpDataSizeError:
233
        raise errors.ConfdClientError("Request too big")
234

    
235
    expire_time = now + constants.CONFD_CLIENT_EXPIRE_TIMEOUT
236
    self._requests[request.rsalt] = _Request(request, args, expire_time)
237

    
238
    if not async:
239
      self.FlushSendQueue()
240

    
241
  def HandleResponse(self, payload, ip, port):
242
    """Asynchronous handler for a confd reply
243

244
    Call the relevant callback associated to the current request.
245

246
    """
247
    try:
248
      try:
249
        answer, salt = self._UnpackReply(payload)
250
      except (errors.SignatureError, errors.ConfdMagicError), err:
251
        if self._logger:
252
          self._logger.debug("Discarding broken package: %s" % err)
253
        return
254

    
255
      try:
256
        rq = self._requests[salt]
257
      except KeyError:
258
        if self._logger:
259
          self._logger.debug("Discarding unknown (expired?) reply: %s" % err)
260
        return
261

    
262
      client_reply = ConfdUpcallPayload(salt=salt,
263
                                        type=UPCALL_REPLY,
264
                                        server_reply=answer,
265
                                        orig_request=rq.request,
266
                                        server_ip=ip,
267
                                        server_port=port,
268
                                        extra_args=rq.args,
269
                                        client=self,
270
                                       )
271
      self._callback(client_reply)
272

    
273
    finally:
274
      self.ExpireRequests()
275

    
276
  def FlushSendQueue(self):
277
    """Send out all pending requests.
278

279
    Can be used for synchronous client use.
280

281
    """
282
    while self._socket.writable():
283
      self._socket.handle_write()
284

    
285
  def ReceiveReply(self, timeout=1):
286
    """Receive one reply.
287

288
    @type timeout: float
289
    @param timeout: how long to wait for the reply
290
    @rtype: boolean
291
    @return: True if some data has been handled, False otherwise
292

293
    """
294
    return self._socket.process_next_packet(timeout=timeout)
295

    
296

    
297
# UPCALL_REPLY: server reply upcall
298
# has all ConfdUpcallPayload fields populated
299
UPCALL_REPLY = 1
300
# UPCALL_EXPIRE: internal library request expire
301
# has only salt, type, orig_request and extra_args
302
UPCALL_EXPIRE = 2
303
CONFD_UPCALL_TYPES = frozenset([
304
  UPCALL_REPLY,
305
  UPCALL_EXPIRE,
306
  ])
307

    
308

    
309
class ConfdUpcallPayload(objects.ConfigObject):
310
  """Callback argument for confd replies
311

312
  @type salt: string
313
  @ivar salt: salt associated with the query
314
  @type type: one of confd.client.CONFD_UPCALL_TYPES
315
  @ivar type: upcall type (server reply, expired request, ...)
316
  @type orig_request: L{objects.ConfdRequest}
317
  @ivar orig_request: original request
318
  @type server_reply: L{objects.ConfdReply}
319
  @ivar server_reply: server reply
320
  @type server_ip: string
321
  @ivar server_ip: answering server ip address
322
  @type server_port: int
323
  @ivar server_port: answering server port
324
  @type extra_args: any
325
  @ivar extra_args: 'args' argument of the SendRequest function
326
  @type client: L{ConfdClient}
327
  @ivar client: current confd client instance
328

329
  """
330
  __slots__ = [
331
    "salt",
332
    "type",
333
    "orig_request",
334
    "server_reply",
335
    "server_ip",
336
    "server_port",
337
    "extra_args",
338
    "client",
339
    ]
340

    
341

    
342
class ConfdClientRequest(objects.ConfdRequest):
343
  """This is the client-side version of ConfdRequest.
344

345
  This version of the class helps creating requests, on the client side, by
346
  filling in some default values.
347

348
  """
349
  def __init__(self, **kwargs):
350
    objects.ConfdRequest.__init__(self, **kwargs)
351
    if not self.rsalt:
352
      self.rsalt = utils.NewUUID()
353
    if not self.protocol:
354
      self.protocol = constants.CONFD_PROTOCOL_VERSION
355
    if self.type not in constants.CONFD_REQS:
356
      raise errors.ConfdClientError("Invalid request type")
357

    
358

    
359
class ConfdFilterCallback:
360
  """Callback that calls another callback, but filters duplicate results.
361

362
  @ivar consistent: a dictionary indexed by salt; for each salt, if
363
      all responses ware identical, this will be True; this is the
364
      expected state on a healthy cluster; on inconsistent or
365
      partitioned clusters, this might be False, if we see answers
366
      with the same serial but different contents
367

368
  """
369
  def __init__(self, callback, logger=None):
370
    """Constructor for ConfdFilterCallback
371

372
    @type callback: f(L{ConfdUpcallPayload})
373
    @param callback: function to call when getting answers
374
    @type logger: logging.Logger
375
    @param logger: optional logger for internal conditions
376

377
    """
378
    if not callable(callback):
379
      raise errors.ProgrammerError("callback must be callable")
380

    
381
    self._callback = callback
382
    self._logger = logger
383
    # answers contains a dict of salt -> answer
384
    self._answers = {}
385
    self.consistent = {}
386

    
387
  def _LogFilter(self, salt, new_reply, old_reply):
388
    if not self._logger:
389
      return
390

    
391
    if new_reply.serial > old_reply.serial:
392
      self._logger.debug("Filtering confirming answer, with newer"
393
                         " serial for query %s" % salt)
394
    elif new_reply.serial == old_reply.serial:
395
      if new_reply.answer != old_reply.answer:
396
        self._logger.warning("Got incoherent answers for query %s"
397
                             " (serial: %s)" % (salt, new_reply.serial))
398
      else:
399
        self._logger.debug("Filtering confirming answer, with same"
400
                           " serial for query %s" % salt)
401
    else:
402
      self._logger.debug("Filtering outdated answer for query %s"
403
                         " serial: (%d < %d)" % (salt, old_reply.serial,
404
                                                 new_reply.serial))
405

    
406
  def _HandleExpire(self, up):
407
    # if we have no answer we have received none, before the expiration.
408
    if up.salt in self._answers:
409
      del self._answers[up.salt]
410
    if up.salt in self.consistent:
411
      del self.consistent[up.salt]
412

    
413
  def _HandleReply(self, up):
414
    """Handle a single confd reply, and decide whether to filter it.
415

416
    @rtype: boolean
417
    @return: True if the reply should be filtered, False if it should be passed
418
             on to the up-callback
419

420
    """
421
    filter_upcall = False
422
    salt = up.salt
423
    if salt not in self.consistent:
424
      self.consistent[salt] = True
425
    if salt not in self._answers:
426
      # first answer for a query (don't filter, and record)
427
      self._answers[salt] = up.server_reply
428
    elif up.server_reply.serial > self._answers[salt].serial:
429
      # newer answer (record, and compare contents)
430
      old_answer = self._answers[salt]
431
      self._answers[salt] = up.server_reply
432
      if up.server_reply.answer == old_answer.answer:
433
        # same content (filter) (version upgrade was unrelated)
434
        filter_upcall = True
435
        self._LogFilter(salt, up.server_reply, old_answer)
436
      # else: different content, pass up a second answer
437
    else:
438
      # older or same-version answer (duplicate or outdated, filter)
439
      if (up.server_reply.serial == self._answers[salt].serial and
440
          up.server_reply.answer != self._answers[salt].answer):
441
        self.consistent[salt] = False
442
      filter_upcall = True
443
      self._LogFilter(salt, up.server_reply, self._answers[salt])
444

    
445
    return filter_upcall
446

    
447
  def __call__(self, up):
448
    """Filtering callback
449

450
    @type up: L{ConfdUpcallPayload}
451
    @param up: upper callback
452

453
    """
454
    filter_upcall = False
455
    if up.type == UPCALL_REPLY:
456
      filter_upcall = self._HandleReply(up)
457
    elif up.type == UPCALL_EXPIRE:
458
      self._HandleExpire(up)
459

    
460
    if not filter_upcall:
461
      self._callback(up)
462

    
463

    
464
class ConfdCountingCallback:
465
  """Callback that calls another callback, and counts the answers
466

467
  """
468
  def __init__(self, callback, logger=None):
469
    """Constructor for ConfdCountingCallback
470

471
    @type callback: f(L{ConfdUpcallPayload})
472
    @param callback: function to call when getting answers
473
    @type logger: logging.Logger
474
    @param logger: optional logger for internal conditions
475

476
    """
477
    if not callable(callback):
478
      raise errors.ProgrammerError("callback must be callable")
479

    
480
    self._callback = callback
481
    self._logger = logger
482
    # answers contains a dict of salt -> count
483
    self._answers = {}
484

    
485
  def RegisterQuery(self, salt):
486
    if salt in self._answers:
487
      raise errors.ProgrammerError("query already registered")
488
    self._answers[salt] = 0
489

    
490
  def AllAnswered(self):
491
    """Have all the registered queries received at least an answer?
492

493
    """
494
    return utils.all(self._answers.values())
495

    
496
  def _HandleExpire(self, up):
497
    # if we have no answer we have received none, before the expiration.
498
    if up.salt in self._answers:
499
      del self._answers[up.salt]
500

    
501
  def _HandleReply(self, up):
502
    """Handle a single confd reply, and decide whether to filter it.
503

504
    @rtype: boolean
505
    @return: True if the reply should be filtered, False if it should be passed
506
             on to the up-callback
507

508
    """
509
    if up.salt in self._answers:
510
      self._answers[up.salt] += 1
511

    
512
  def __call__(self, up):
513
    """Filtering callback
514

515
    @type up: L{ConfdUpcallPayload}
516
    @param up: upper callback
517

518
    """
519
    if up.type == UPCALL_REPLY:
520
      self._HandleReply(up)
521
    elif up.type == UPCALL_EXPIRE:
522
      self._HandleExpire(up)
523
    self._callback(up)
524

    
525

    
526
def GetConfdClient(callback):
527
  """Return a client configured using the given callback.
528

529
  This is handy to abstract the MC list and HMAC key reading.
530

531
  @attention: This should only be called on nodes which are part of a
532
      cluster, since it depends on a valid (ganeti) data directory;
533
      for code running outside of a cluster, you need to create the
534
      client manually
535

536
  """
537
  ss = ssconf.SimpleStore()
538
  mc_file = ss.KeyToFilename(constants.SS_MASTER_CANDIDATES_IPS)
539
  mc_list = utils.ReadFile(mc_file).splitlines()
540
  hmac_key = utils.ReadFile(constants.CONFD_HMAC_KEY)
541
  return ConfdClient(hmac_key, mc_list, callback)