Statistics
| Branch: | Tag: | Revision:

root / lib / confd / client.py @ 392ca296

History | View | Annotate | Download (12.3 kB)

1
#
2
#
3

    
4
# Copyright (C) 2009 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Ganeti confd client
23

24
Clients can use the confd client library to send requests to a group of master
25
candidates running confd. The expected usage is through the asyncore framework,
26
by sending queries, and asynchronously receiving replies through a callback.
27

28
This way the client library doesn't ever need to "wait" on a particular answer,
29
and can proceed even if some udp packets are lost. It's up to the user to
30
reschedule queries if they haven't received responses and they need them.
31

32
Example usage:
33
  client = ConfdClient(...) # includes callback specification
34
  req = confd_client.ConfdClientRequest(type=constants.CONFD_REQ_PING)
35
  client.SendRequest(req)
36
  # then make sure your client calls asyncore.loop() or daemon.Mainloop.Run()
37
  # ... wait ...
38
  # And your callback will be called by asyncore, when your query gets a
39
  # response, or when it expires.
40

41
You can use the provided ConfdFilterCallback to act as a filter, only passing
42
"newer" answer to your callback, and filtering out outdated ones, or ones
43
confirming what you already got.
44

45
"""
46
import socket
47
import time
48
import random
49

    
50
from ganeti import utils
51
from ganeti import constants
52
from ganeti import objects
53
from ganeti import serializer
54
from ganeti import daemon # contains AsyncUDPSocket
55
from ganeti import errors
56
from ganeti import confd
57

    
58

    
59
class ConfdAsyncUDPClient(daemon.AsyncUDPSocket):
60
  """Confd udp asyncore client
61

62
  This is kept separate from the main ConfdClient to make sure it's easy to
63
  implement a non-asyncore based client library.
64

65
  """
66
  def __init__(self, client):
67
    """Constructor for ConfdAsyncUDPClient
68

69
    @type client: L{ConfdClient}
70
    @param client: client library, to pass the datagrams to
71

72
    """
73
    daemon.AsyncUDPSocket.__init__(self)
74
    self.client = client
75

    
76
  # this method is overriding a daemon.AsyncUDPSocket method
77
  def handle_datagram(self, payload, ip, port):
78
    self.client.HandleResponse(payload, ip, port)
79

    
80

    
81
class ConfdClient:
82
  """Send queries to confd, and get back answers.
83

84
  Since the confd model works by querying multiple master candidates, and
85
  getting back answers, this is an asynchronous library. It can either work
86
  through asyncore or with your own handling.
87

88
  """
89
  def __init__(self, hmac_key, peers, callback, port=None, logger=None):
90
    """Constructor for ConfdClient
91

92
    @type hmac_key: string
93
    @param hmac_key: hmac key to talk to confd
94
    @type peers: list
95
    @param peers: list of peer nodes
96
    @type callback: f(L{ConfdUpcallPayload})
97
    @param callback: function to call when getting answers
98
    @type port: integer
99
    @keyword port: confd port (default: use GetDaemonPort)
100
    @type logger: L{logging.Logger}
101
    @keyword logger: optional logger for internal conditions
102

103
    """
104
    if not isinstance(peers, list):
105
      raise errors.ProgrammerError("peers must be a list")
106
    if not callable(callback):
107
      raise errors.ProgrammerError("callback must be callable")
108

    
109
    self._peers = peers
110
    self._hmac_key = hmac_key
111
    self._socket = ConfdAsyncUDPClient(self)
112
    self._callback = callback
113
    self._confd_port = port
114
    self._logger = logger
115
    self._requests = {}
116
    self._expire_requests = []
117

    
118
    if self._confd_port is None:
119
      self._confd_port = utils.GetDaemonPort(constants.CONFD)
120

    
121
  def _PackRequest(self, request, now=None):
122
    """Prepare a request to be sent on the wire.
123

124
    This function puts a proper salt in a confd request, puts the proper salt,
125
    and adds the correct magic number.
126

127
    """
128
    if now is None:
129
      now = time.time()
130
    tstamp = '%d' % now
131
    req = serializer.DumpSignedJson(request.ToDict(), self._hmac_key, tstamp)
132
    return confd.PackMagic(req)
133

    
134
  def _UnpackReply(self, payload):
135
    in_payload = confd.UnpackMagic(payload)
136
    (dict_answer, salt) = serializer.LoadSignedJson(in_payload, self._hmac_key)
137
    answer = objects.ConfdReply.FromDict(dict_answer)
138
    return answer, salt
139

    
140
  def ExpireRequests(self):
141
    """Delete all the expired requests.
142

143
    """
144
    now = time.time()
145
    while self._expire_requests:
146
      expire_time, rsalt = self._expire_requests[0]
147
      if now >= expire_time:
148
        self._expire_requests.pop(0)
149
        (request, args) = self._requests[rsalt]
150
        del self._requests[rsalt]
151
        client_reply = ConfdUpcallPayload(salt=rsalt,
152
                                          type=UPCALL_EXPIRE,
153
                                          orig_request=request,
154
                                          extra_args=args)
155
        self._callback(client_reply)
156
      else:
157
        break
158

    
159
  def SendRequest(self, request, args=None, coverage=None):
160
    """Send a confd request to some MCs
161

162
    @type request: L{objects.ConfdRequest}
163
    @param request: the request to send
164
    @type args: tuple
165
    @keyword args: additional callback arguments
166
    @type coverage: integer
167
    @keyword coverage: number of remote nodes to contact
168

169
    """
170
    if coverage is None:
171
      coverage = min(len(self._peers), constants.CONFD_DEFAULT_REQ_COVERAGE)
172

    
173
    if coverage > len(self._peers):
174
      raise errors.ConfdClientError("Not enough MCs known to provide the"
175
                                    " desired coverage")
176

    
177
    if not request.rsalt:
178
      raise errors.ConfdClientError("Missing request rsalt")
179

    
180
    self.ExpireRequests()
181
    if request.rsalt in self._requests:
182
      raise errors.ConfdClientError("Duplicate request rsalt")
183

    
184
    if request.type not in constants.CONFD_REQS:
185
      raise errors.ConfdClientError("Invalid request type")
186

    
187
    random.shuffle(self._peers)
188
    targets = self._peers[:coverage]
189

    
190
    now = time.time()
191
    payload = self._PackRequest(request, now=now)
192

    
193
    for target in targets:
194
      try:
195
        self._socket.enqueue_send(target, self._confd_port, payload)
196
      except errors.UdpDataSizeError:
197
        raise errors.ConfdClientError("Request too big")
198

    
199
    self._requests[request.rsalt] = (request, args)
200
    expire_time = now + constants.CONFD_CLIENT_EXPIRE_TIMEOUT
201
    self._expire_requests.append((expire_time, request.rsalt))
202

    
203
  def HandleResponse(self, payload, ip, port):
204
    """Asynchronous handler for a confd reply
205

206
    Call the relevant callback associated to the current request.
207

208
    """
209
    try:
210
      try:
211
        answer, salt = self._UnpackReply(payload)
212
      except (errors.SignatureError, errors.ConfdMagicError), err:
213
        if self._logger:
214
          self._logger.debug("Discarding broken package: %s" % err)
215
        return
216

    
217
      try:
218
        (request, args) = self._requests[salt]
219
      except KeyError:
220
        if self._logger:
221
          self._logger.debug("Discarding unknown (expired?) reply: %s" % err)
222
        return
223

    
224
      client_reply = ConfdUpcallPayload(salt=salt,
225
                                        type=UPCALL_REPLY,
226
                                        server_reply=answer,
227
                                        orig_request=request,
228
                                        server_ip=ip,
229
                                        server_port=port,
230
                                        extra_args=args)
231
      self._callback(client_reply)
232

    
233
    finally:
234
      self.ExpireRequests()
235

    
236

    
237
# UPCALL_REPLY: server reply upcall
238
# has all ConfdUpcallPayload fields populated
239
UPCALL_REPLY = 1
240
# UPCALL_EXPIRE: internal library request expire
241
# has only salt, type, orig_request and extra_args
242
UPCALL_EXPIRE = 2
243
CONFD_UPCALL_TYPES = frozenset([
244
  UPCALL_REPLY,
245
  UPCALL_EXPIRE,
246
  ])
247

    
248

    
249
class ConfdUpcallPayload(objects.ConfigObject):
250
  """Callback argument for confd replies
251

252
  @type salt: string
253
  @ivar salt: salt associated with the query
254
  @type type: one of confd.client.CONFD_UPCALL_TYPES
255
  @ivar type: upcall type (server reply, expired request, ...)
256
  @type orig_request: L{objects.ConfdRequest}
257
  @ivar orig_request: original request
258
  @type server_reply: L{objects.ConfdReply}
259
  @ivar server_reply: server reply
260
  @type server_ip: string
261
  @ivar server_ip: answering server ip address
262
  @type server_port: int
263
  @ivar server_port: answering server port
264
  @type extra_args: any
265
  @ivar extra_args: 'args' argument of the SendRequest function
266

267
  """
268
  __slots__ = [
269
    "salt",
270
    "type",
271
    "orig_request",
272
    "server_reply",
273
    "server_ip",
274
    "server_port",
275
    "extra_args",
276
    ]
277

    
278

    
279
class ConfdClientRequest(objects.ConfdRequest):
280
  """This is the client-side version of ConfdRequest.
281

282
  This version of the class helps creating requests, on the client side, by
283
  filling in some default values.
284

285
  """
286
  def __init__(self, **kwargs):
287
    objects.ConfdRequest.__init__(self, **kwargs)
288
    if not self.rsalt:
289
      self.rsalt = utils.NewUUID()
290
    if not self.protocol:
291
      self.protocol = constants.CONFD_PROTOCOL_VERSION
292
    if self.type not in constants.CONFD_REQS:
293
      raise errors.ConfdClientError("Invalid request type")
294

    
295

    
296
class ConfdFilterCallback:
297
  """Callback that calls another callback, but filters duplicate results.
298

299
  """
300
  def __init__(self, callback, logger=None):
301
    """Constructor for ConfdFilterCallback
302

303
    @type callback: f(L{ConfdUpcallPayload})
304
    @param callback: function to call when getting answers
305
    @type logger: L{logging.Logger}
306
    @keyword logger: optional logger for internal conditions
307

308
    """
309
    if not callable(callback):
310
      raise errors.ProgrammerError("callback must be callable")
311

    
312
    self._callback = callback
313
    self._logger = logger
314
    # answers contains a dict of salt -> answer
315
    self._answers = {}
316

    
317
  def _LogFilter(self, salt, new_reply, old_reply):
318
    if not self._logger:
319
      return
320

    
321
    if new_reply.serial > old_reply.serial:
322
      self._logger.debug("Filtering confirming answer, with newer"
323
                         " serial for query %s" % salt)
324
    elif new_reply.serial == old_reply.serial:
325
      if new_reply.answer != old_reply.answer:
326
        self._logger.warning("Got incoherent answers for query %s"
327
                             " (serial: %s)" % (salt, new_reply.serial))
328
      else:
329
        self._logger.debug("Filtering confirming answer, with same"
330
                           " serial for query %s" % salt)
331
    else:
332
      self._logger.debug("Filtering outdated answer for query %s"
333
                         " serial: (%d < %d)" % (salt, old_reply.serial,
334
                                                 new_reply.serial))
335

    
336
  def _HandleExpire(self, up):
337
    # if we have no answer we have received none, before the expiration.
338
    if salt in self._answers:
339
      del self._answers[salt]
340

    
341
  def _HandleReply(self, up):
342
    """Handle a single confd reply, and decide whether to filter it.
343

344
    @rtype: boolean
345
    @return: True if the reply should be filtered, False if it should be passed
346
             on to the up-callback
347

348
    """
349
    filter_upcall = False
350
    salt = up.salt
351
    if salt not in self._answers:
352
      # first answer for a query (don't filter, and record)
353
      self._answers[salt] = up.server_reply
354
    elif up.server_reply.serial > self._answers[salt].serial:
355
      # newer answer (record, and compare contents)
356
      old_answer = self._answers[salt]
357
      self._answers[salt] = up.server_reply
358
      if up.server_reply.answer == old_answer.answer:
359
        # same content (filter) (version upgrade was unrelated)
360
        filter_upcall = True
361
        self._LogFilter(salt, up.server_reply, old_answer)
362
      # else: different content, pass up a second answer
363
    else:
364
      # older or same-version answer (duplicate or outdated, filter)
365
      filter_upcall = True
366
      self._LogFilter(salt, up.server_reply, self._answers[salt])
367

    
368
    return filter_upcall
369

    
370
  def __call__(self, up):
371
    """Filtering callback
372

373
    @type up: L{ConfdUpcallPayload}
374
    @param up: upper callback
375

376
    """
377
    filter_upcall = False
378
    if up.type == UPCALL_REPLY:
379
      filter_upcall = self._HandleReply(up)
380
    elif up.type == UPCALL_EXPIRE:
381
      self._HandleExpire(up)
382

    
383
    if not filter_upcall:
384
      self._callback(up)
385