Statistics
| Branch: | Tag: | Revision:

root / lib / confd / client.py @ 6c881c52

History | View | Annotate | Download (12.9 kB)

1
#
2
#
3

    
4
# Copyright (C) 2009 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Ganeti confd client
23

24
Clients can use the confd client library to send requests to a group of master
25
candidates running confd. The expected usage is through the asyncore framework,
26
by sending queries, and asynchronously receiving replies through a callback.
27

28
This way the client library doesn't ever need to "wait" on a particular answer,
29
and can proceed even if some udp packets are lost. It's up to the user to
30
reschedule queries if they haven't received responses and they need them.
31

32
Example usage::
33

34
  client = ConfdClient(...) # includes callback specification
35
  req = confd_client.ConfdClientRequest(type=constants.CONFD_REQ_PING)
36
  client.SendRequest(req)
37
  # then make sure your client calls asyncore.loop() or daemon.Mainloop.Run()
38
  # ... wait ...
39
  # And your callback will be called by asyncore, when your query gets a
40
  # response, or when it expires.
41

42
You can use the provided ConfdFilterCallback to act as a filter, only passing
43
"newer" answer to your callback, and filtering out outdated ones, or ones
44
confirming what you already got.
45

46
"""
47

    
48
# pylint: disable-msg=E0203
49

    
50
# E0203: Access to member %r before its definition, since we use
51
# objects.py which doesn't explicitely initialise its members
52

    
53
import socket
54
import time
55
import random
56

    
57
from ganeti import utils
58
from ganeti import constants
59
from ganeti import objects
60
from ganeti import serializer
61
from ganeti import daemon # contains AsyncUDPSocket
62
from ganeti import errors
63
from ganeti import confd
64

    
65

    
66
class ConfdAsyncUDPClient(daemon.AsyncUDPSocket):
67
  """Confd udp asyncore client
68

69
  This is kept separate from the main ConfdClient to make sure it's easy to
70
  implement a non-asyncore based client library.
71

72
  """
73
  def __init__(self, client):
74
    """Constructor for ConfdAsyncUDPClient
75

76
    @type client: L{ConfdClient}
77
    @param client: client library, to pass the datagrams to
78

79
    """
80
    daemon.AsyncUDPSocket.__init__(self)
81
    self.client = client
82

    
83
  # this method is overriding a daemon.AsyncUDPSocket method
84
  def handle_datagram(self, payload, ip, port):
85
    self.client.HandleResponse(payload, ip, port)
86

    
87

    
88
class ConfdClient:
89
  """Send queries to confd, and get back answers.
90

91
  Since the confd model works by querying multiple master candidates, and
92
  getting back answers, this is an asynchronous library. It can either work
93
  through asyncore or with your own handling.
94

95
  """
96
  def __init__(self, hmac_key, peers, callback, port=None, logger=None):
97
    """Constructor for ConfdClient
98

99
    @type hmac_key: string
100
    @param hmac_key: hmac key to talk to confd
101
    @type peers: list
102
    @param peers: list of peer nodes
103
    @type callback: f(L{ConfdUpcallPayload})
104
    @param callback: function to call when getting answers
105
    @type port: integer
106
    @keyword port: confd port (default: use GetDaemonPort)
107
    @type logger: logging.Logger
108
    @keyword logger: optional logger for internal conditions
109

110
    """
111
    if not callable(callback):
112
      raise errors.ProgrammerError("callback must be callable")
113

    
114
    self.UpdatePeerList(peers)
115
    self._hmac_key = hmac_key
116
    self._socket = ConfdAsyncUDPClient(self)
117
    self._callback = callback
118
    self._confd_port = port
119
    self._logger = logger
120
    self._requests = {}
121
    self._expire_requests = []
122

    
123
    if self._confd_port is None:
124
      self._confd_port = utils.GetDaemonPort(constants.CONFD)
125

    
126
  def UpdatePeerList(self, peers):
127
    """Update the list of peers
128

129
    @type peers: list
130
    @param peers: list of peer nodes
131

132
    """
133
    if not isinstance(peers, list):
134
      raise errors.ProgrammerError("peers must be a list")
135
    self._peers = peers
136

    
137
  def _PackRequest(self, request, now=None):
138
    """Prepare a request to be sent on the wire.
139

140
    This function puts a proper salt in a confd request, puts the proper salt,
141
    and adds the correct magic number.
142

143
    """
144
    if now is None:
145
      now = time.time()
146
    tstamp = '%d' % now
147
    req = serializer.DumpSignedJson(request.ToDict(), self._hmac_key, tstamp)
148
    return confd.PackMagic(req)
149

    
150
  def _UnpackReply(self, payload):
151
    in_payload = confd.UnpackMagic(payload)
152
    (dict_answer, salt) = serializer.LoadSignedJson(in_payload, self._hmac_key)
153
    answer = objects.ConfdReply.FromDict(dict_answer)
154
    return answer, salt
155

    
156
  def ExpireRequests(self):
157
    """Delete all the expired requests.
158

159
    """
160
    now = time.time()
161
    while self._expire_requests:
162
      expire_time, rsalt = self._expire_requests[0]
163
      if now >= expire_time:
164
        self._expire_requests.pop(0)
165
        (request, args) = self._requests[rsalt]
166
        del self._requests[rsalt]
167
        client_reply = ConfdUpcallPayload(salt=rsalt,
168
                                          type=UPCALL_EXPIRE,
169
                                          orig_request=request,
170
                                          extra_args=args,
171
                                          client=self,
172
                                          )
173
        self._callback(client_reply)
174
      else:
175
        break
176

    
177
  def SendRequest(self, request, args=None, coverage=None):
178
    """Send a confd request to some MCs
179

180
    @type request: L{objects.ConfdRequest}
181
    @param request: the request to send
182
    @type args: tuple
183
    @keyword args: additional callback arguments
184
    @type coverage: integer
185
    @keyword coverage: number of remote nodes to contact
186

187
    """
188
    if coverage is None:
189
      coverage = min(len(self._peers), constants.CONFD_DEFAULT_REQ_COVERAGE)
190

    
191
    if coverage > len(self._peers):
192
      raise errors.ConfdClientError("Not enough MCs known to provide the"
193
                                    " desired coverage")
194

    
195
    if not request.rsalt:
196
      raise errors.ConfdClientError("Missing request rsalt")
197

    
198
    self.ExpireRequests()
199
    if request.rsalt in self._requests:
200
      raise errors.ConfdClientError("Duplicate request rsalt")
201

    
202
    if request.type not in constants.CONFD_REQS:
203
      raise errors.ConfdClientError("Invalid request type")
204

    
205
    random.shuffle(self._peers)
206
    targets = self._peers[:coverage]
207

    
208
    now = time.time()
209
    payload = self._PackRequest(request, now=now)
210

    
211
    for target in targets:
212
      try:
213
        self._socket.enqueue_send(target, self._confd_port, payload)
214
      except errors.UdpDataSizeError:
215
        raise errors.ConfdClientError("Request too big")
216

    
217
    self._requests[request.rsalt] = (request, args)
218
    expire_time = now + constants.CONFD_CLIENT_EXPIRE_TIMEOUT
219
    self._expire_requests.append((expire_time, request.rsalt))
220

    
221
  def HandleResponse(self, payload, ip, port):
222
    """Asynchronous handler for a confd reply
223

224
    Call the relevant callback associated to the current request.
225

226
    """
227
    try:
228
      try:
229
        answer, salt = self._UnpackReply(payload)
230
      except (errors.SignatureError, errors.ConfdMagicError), err:
231
        if self._logger:
232
          self._logger.debug("Discarding broken package: %s" % err)
233
        return
234

    
235
      try:
236
        (request, args) = self._requests[salt]
237
      except KeyError:
238
        if self._logger:
239
          self._logger.debug("Discarding unknown (expired?) reply: %s" % err)
240
        return
241

    
242
      client_reply = ConfdUpcallPayload(salt=salt,
243
                                        type=UPCALL_REPLY,
244
                                        server_reply=answer,
245
                                        orig_request=request,
246
                                        server_ip=ip,
247
                                        server_port=port,
248
                                        extra_args=args,
249
                                        client=self,
250
                                       )
251
      self._callback(client_reply)
252

    
253
    finally:
254
      self.ExpireRequests()
255

    
256

    
257
# UPCALL_REPLY: server reply upcall
258
# has all ConfdUpcallPayload fields populated
259
UPCALL_REPLY = 1
260
# UPCALL_EXPIRE: internal library request expire
261
# has only salt, type, orig_request and extra_args
262
UPCALL_EXPIRE = 2
263
CONFD_UPCALL_TYPES = frozenset([
264
  UPCALL_REPLY,
265
  UPCALL_EXPIRE,
266
  ])
267

    
268

    
269
class ConfdUpcallPayload(objects.ConfigObject):
270
  """Callback argument for confd replies
271

272
  @type salt: string
273
  @ivar salt: salt associated with the query
274
  @type type: one of confd.client.CONFD_UPCALL_TYPES
275
  @ivar type: upcall type (server reply, expired request, ...)
276
  @type orig_request: L{objects.ConfdRequest}
277
  @ivar orig_request: original request
278
  @type server_reply: L{objects.ConfdReply}
279
  @ivar server_reply: server reply
280
  @type server_ip: string
281
  @ivar server_ip: answering server ip address
282
  @type server_port: int
283
  @ivar server_port: answering server port
284
  @type extra_args: any
285
  @ivar extra_args: 'args' argument of the SendRequest function
286
  @type client: L{ConfdClient}
287
  @ivar client: current confd client instance
288

289
  """
290
  __slots__ = [
291
    "salt",
292
    "type",
293
    "orig_request",
294
    "server_reply",
295
    "server_ip",
296
    "server_port",
297
    "extra_args",
298
    "client",
299
    ]
300

    
301

    
302
class ConfdClientRequest(objects.ConfdRequest):
303
  """This is the client-side version of ConfdRequest.
304

305
  This version of the class helps creating requests, on the client side, by
306
  filling in some default values.
307

308
  """
309
  def __init__(self, **kwargs):
310
    objects.ConfdRequest.__init__(self, **kwargs)
311
    if not self.rsalt:
312
      self.rsalt = utils.NewUUID()
313
    if not self.protocol:
314
      self.protocol = constants.CONFD_PROTOCOL_VERSION
315
    if self.type not in constants.CONFD_REQS:
316
      raise errors.ConfdClientError("Invalid request type")
317

    
318

    
319
class ConfdFilterCallback:
320
  """Callback that calls another callback, but filters duplicate results.
321

322
  """
323
  def __init__(self, callback, logger=None):
324
    """Constructor for ConfdFilterCallback
325

326
    @type callback: f(L{ConfdUpcallPayload})
327
    @param callback: function to call when getting answers
328
    @type logger: logging.Logger
329
    @keyword logger: optional logger for internal conditions
330

331
    """
332
    if not callable(callback):
333
      raise errors.ProgrammerError("callback must be callable")
334

    
335
    self._callback = callback
336
    self._logger = logger
337
    # answers contains a dict of salt -> answer
338
    self._answers = {}
339

    
340
  def _LogFilter(self, salt, new_reply, old_reply):
341
    if not self._logger:
342
      return
343

    
344
    if new_reply.serial > old_reply.serial:
345
      self._logger.debug("Filtering confirming answer, with newer"
346
                         " serial for query %s" % salt)
347
    elif new_reply.serial == old_reply.serial:
348
      if new_reply.answer != old_reply.answer:
349
        self._logger.warning("Got incoherent answers for query %s"
350
                             " (serial: %s)" % (salt, new_reply.serial))
351
      else:
352
        self._logger.debug("Filtering confirming answer, with same"
353
                           " serial for query %s" % salt)
354
    else:
355
      self._logger.debug("Filtering outdated answer for query %s"
356
                         " serial: (%d < %d)" % (salt, old_reply.serial,
357
                                                 new_reply.serial))
358

    
359
  def _HandleExpire(self, up):
360
    # if we have no answer we have received none, before the expiration.
361
    if up.salt in self._answers:
362
      del self._answers[up.salt]
363

    
364
  def _HandleReply(self, up):
365
    """Handle a single confd reply, and decide whether to filter it.
366

367
    @rtype: boolean
368
    @return: True if the reply should be filtered, False if it should be passed
369
             on to the up-callback
370

371
    """
372
    filter_upcall = False
373
    salt = up.salt
374
    if salt not in self._answers:
375
      # first answer for a query (don't filter, and record)
376
      self._answers[salt] = up.server_reply
377
    elif up.server_reply.serial > self._answers[salt].serial:
378
      # newer answer (record, and compare contents)
379
      old_answer = self._answers[salt]
380
      self._answers[salt] = up.server_reply
381
      if up.server_reply.answer == old_answer.answer:
382
        # same content (filter) (version upgrade was unrelated)
383
        filter_upcall = True
384
        self._LogFilter(salt, up.server_reply, old_answer)
385
      # else: different content, pass up a second answer
386
    else:
387
      # older or same-version answer (duplicate or outdated, filter)
388
      filter_upcall = True
389
      self._LogFilter(salt, up.server_reply, self._answers[salt])
390

    
391
    return filter_upcall
392

    
393
  def __call__(self, up):
394
    """Filtering callback
395

396
    @type up: L{ConfdUpcallPayload}
397
    @param up: upper callback
398

399
    """
400
    filter_upcall = False
401
    if up.type == UPCALL_REPLY:
402
      filter_upcall = self._HandleReply(up)
403
    elif up.type == UPCALL_EXPIRE:
404
      self._HandleExpire(up)
405

    
406
    if not filter_upcall:
407
      self._callback(up)