Statistics
| Branch: | Tag: | Revision:

root / lib / confd / client.py @ 5f6f260a

History | View | Annotate | Download (12.6 kB)

1
#
2
#
3

    
4
# Copyright (C) 2009 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Ganeti confd client
23

24
Clients can use the confd client library to send requests to a group of master
25
candidates running confd. The expected usage is through the asyncore framework,
26
by sending queries, and asynchronously receiving replies through a callback.
27

28
This way the client library doesn't ever need to "wait" on a particular answer,
29
and can proceed even if some udp packets are lost. It's up to the user to
30
reschedule queries if they haven't received responses and they need them.
31

32
Example usage:
33
  client = ConfdClient(...) # includes callback specification
34
  req = confd_client.ConfdClientRequest(type=constants.CONFD_REQ_PING)
35
  client.SendRequest(req)
36
  # then make sure your client calls asyncore.loop() or daemon.Mainloop.Run()
37
  # ... wait ...
38
  # And your callback will be called by asyncore, when your query gets a
39
  # response, or when it expires.
40

41
You can use the provided ConfdFilterCallback to act as a filter, only passing
42
"newer" answer to your callback, and filtering out outdated ones, or ones
43
confirming what you already got.
44

45
"""
46
import socket
47
import time
48
import random
49

    
50
from ganeti import utils
51
from ganeti import constants
52
from ganeti import objects
53
from ganeti import serializer
54
from ganeti import daemon # contains AsyncUDPSocket
55
from ganeti import errors
56
from ganeti import confd
57

    
58

    
59
class ConfdAsyncUDPClient(daemon.AsyncUDPSocket):
60
  """Confd udp asyncore client
61

62
  This is kept separate from the main ConfdClient to make sure it's easy to
63
  implement a non-asyncore based client library.
64

65
  """
66
  def __init__(self, client):
67
    """Constructor for ConfdAsyncUDPClient
68

69
    @type client: L{ConfdClient}
70
    @param client: client library, to pass the datagrams to
71

72
    """
73
    daemon.AsyncUDPSocket.__init__(self)
74
    self.client = client
75

    
76
  # this method is overriding a daemon.AsyncUDPSocket method
77
  def handle_datagram(self, payload, ip, port):
78
    self.client.HandleResponse(payload, ip, port)
79

    
80

    
81
class ConfdClient:
82
  """Send queries to confd, and get back answers.
83

84
  Since the confd model works by querying multiple master candidates, and
85
  getting back answers, this is an asynchronous library. It can either work
86
  through asyncore or with your own handling.
87

88
  """
89
  def __init__(self, hmac_key, peers, callback, port=None, logger=None):
90
    """Constructor for ConfdClient
91

92
    @type hmac_key: string
93
    @param hmac_key: hmac key to talk to confd
94
    @type peers: list
95
    @param peers: list of peer nodes
96
    @type callback: f(L{ConfdUpcallPayload})
97
    @param callback: function to call when getting answers
98
    @type port: integer
99
    @keyword port: confd port (default: use GetDaemonPort)
100
    @type logger: L{logging.Logger}
101
    @keyword logger: optional logger for internal conditions
102

103
    """
104
    if not isinstance(peers, list):
105
      raise errors.ProgrammerError("peers must be a list")
106
    if not callable(callback):
107
      raise errors.ProgrammerError("callback must be callable")
108

    
109
    self._peers = peers
110
    self._hmac_key = hmac_key
111
    self._socket = ConfdAsyncUDPClient(self)
112
    self._callback = callback
113
    self._confd_port = port
114
    self._logger = logger
115
    self._requests = {}
116
    self._expire_requests = []
117

    
118
    if self._confd_port is None:
119
      self._confd_port = utils.GetDaemonPort(constants.CONFD)
120

    
121
  def _PackRequest(self, request, now=None):
122
    """Prepare a request to be sent on the wire.
123

124
    This function puts a proper salt in a confd request, puts the proper salt,
125
    and adds the correct magic number.
126

127
    """
128
    if now is None:
129
      now = time.time()
130
    tstamp = '%d' % now
131
    req = serializer.DumpSignedJson(request.ToDict(), self._hmac_key, tstamp)
132
    return confd.PackMagic(req)
133

    
134
  def _UnpackReply(self, payload):
135
    in_payload = confd.UnpackMagic(payload)
136
    (dict_answer, salt) = serializer.LoadSignedJson(in_payload, self._hmac_key)
137
    answer = objects.ConfdReply.FromDict(dict_answer)
138
    return answer, salt
139

    
140
  def ExpireRequests(self):
141
    """Delete all the expired requests.
142

143
    """
144
    now = time.time()
145
    while self._expire_requests:
146
      expire_time, rsalt = self._expire_requests[0]
147
      if now >= expire_time:
148
        self._expire_requests.pop(0)
149
        (request, args) = self._requests[rsalt]
150
        del self._requests[rsalt]
151
        client_reply = ConfdUpcallPayload(salt=rsalt,
152
                                          type=UPCALL_EXPIRE,
153
                                          orig_request=request,
154
                                          extra_args=args,
155
                                          client=self,
156
                                          )
157
        self._callback(client_reply)
158
      else:
159
        break
160

    
161
  def SendRequest(self, request, args=None, coverage=None):
162
    """Send a confd request to some MCs
163

164
    @type request: L{objects.ConfdRequest}
165
    @param request: the request to send
166
    @type args: tuple
167
    @keyword args: additional callback arguments
168
    @type coverage: integer
169
    @keyword coverage: number of remote nodes to contact
170

171
    """
172
    if coverage is None:
173
      coverage = min(len(self._peers), constants.CONFD_DEFAULT_REQ_COVERAGE)
174

    
175
    if coverage > len(self._peers):
176
      raise errors.ConfdClientError("Not enough MCs known to provide the"
177
                                    " desired coverage")
178

    
179
    if not request.rsalt:
180
      raise errors.ConfdClientError("Missing request rsalt")
181

    
182
    self.ExpireRequests()
183
    if request.rsalt in self._requests:
184
      raise errors.ConfdClientError("Duplicate request rsalt")
185

    
186
    if request.type not in constants.CONFD_REQS:
187
      raise errors.ConfdClientError("Invalid request type")
188

    
189
    random.shuffle(self._peers)
190
    targets = self._peers[:coverage]
191

    
192
    now = time.time()
193
    payload = self._PackRequest(request, now=now)
194

    
195
    for target in targets:
196
      try:
197
        self._socket.enqueue_send(target, self._confd_port, payload)
198
      except errors.UdpDataSizeError:
199
        raise errors.ConfdClientError("Request too big")
200

    
201
    self._requests[request.rsalt] = (request, args)
202
    expire_time = now + constants.CONFD_CLIENT_EXPIRE_TIMEOUT
203
    self._expire_requests.append((expire_time, request.rsalt))
204

    
205
  def HandleResponse(self, payload, ip, port):
206
    """Asynchronous handler for a confd reply
207

208
    Call the relevant callback associated to the current request.
209

210
    """
211
    try:
212
      try:
213
        answer, salt = self._UnpackReply(payload)
214
      except (errors.SignatureError, errors.ConfdMagicError), err:
215
        if self._logger:
216
          self._logger.debug("Discarding broken package: %s" % err)
217
        return
218

    
219
      try:
220
        (request, args) = self._requests[salt]
221
      except KeyError:
222
        if self._logger:
223
          self._logger.debug("Discarding unknown (expired?) reply: %s" % err)
224
        return
225

    
226
      client_reply = ConfdUpcallPayload(salt=salt,
227
                                        type=UPCALL_REPLY,
228
                                        server_reply=answer,
229
                                        orig_request=request,
230
                                        server_ip=ip,
231
                                        server_port=port,
232
                                        extra_args=args,
233
                                        client=self,
234
                                       )
235
      self._callback(client_reply)
236

    
237
    finally:
238
      self.ExpireRequests()
239

    
240

    
241
# UPCALL_REPLY: server reply upcall
242
# has all ConfdUpcallPayload fields populated
243
UPCALL_REPLY = 1
244
# UPCALL_EXPIRE: internal library request expire
245
# has only salt, type, orig_request and extra_args
246
UPCALL_EXPIRE = 2
247
CONFD_UPCALL_TYPES = frozenset([
248
  UPCALL_REPLY,
249
  UPCALL_EXPIRE,
250
  ])
251

    
252

    
253
class ConfdUpcallPayload(objects.ConfigObject):
254
  """Callback argument for confd replies
255

256
  @type salt: string
257
  @ivar salt: salt associated with the query
258
  @type type: one of confd.client.CONFD_UPCALL_TYPES
259
  @ivar type: upcall type (server reply, expired request, ...)
260
  @type orig_request: L{objects.ConfdRequest}
261
  @ivar orig_request: original request
262
  @type server_reply: L{objects.ConfdReply}
263
  @ivar server_reply: server reply
264
  @type server_ip: string
265
  @ivar server_ip: answering server ip address
266
  @type server_port: int
267
  @ivar server_port: answering server port
268
  @type extra_args: any
269
  @ivar extra_args: 'args' argument of the SendRequest function
270
  @type client: L{ConfdClient}
271
  @ivar client: current confd client instance
272

273
  """
274
  __slots__ = [
275
    "salt",
276
    "type",
277
    "orig_request",
278
    "server_reply",
279
    "server_ip",
280
    "server_port",
281
    "extra_args",
282
    "client",
283
    ]
284

    
285

    
286
class ConfdClientRequest(objects.ConfdRequest):
287
  """This is the client-side version of ConfdRequest.
288

289
  This version of the class helps creating requests, on the client side, by
290
  filling in some default values.
291

292
  """
293
  def __init__(self, **kwargs):
294
    objects.ConfdRequest.__init__(self, **kwargs)
295
    if not self.rsalt:
296
      self.rsalt = utils.NewUUID()
297
    if not self.protocol:
298
      self.protocol = constants.CONFD_PROTOCOL_VERSION
299
    if self.type not in constants.CONFD_REQS:
300
      raise errors.ConfdClientError("Invalid request type")
301

    
302

    
303
class ConfdFilterCallback:
304
  """Callback that calls another callback, but filters duplicate results.
305

306
  """
307
  def __init__(self, callback, logger=None):
308
    """Constructor for ConfdFilterCallback
309

310
    @type callback: f(L{ConfdUpcallPayload})
311
    @param callback: function to call when getting answers
312
    @type logger: L{logging.Logger}
313
    @keyword logger: optional logger for internal conditions
314

315
    """
316
    if not callable(callback):
317
      raise errors.ProgrammerError("callback must be callable")
318

    
319
    self._callback = callback
320
    self._logger = logger
321
    # answers contains a dict of salt -> answer
322
    self._answers = {}
323

    
324
  def _LogFilter(self, salt, new_reply, old_reply):
325
    if not self._logger:
326
      return
327

    
328
    if new_reply.serial > old_reply.serial:
329
      self._logger.debug("Filtering confirming answer, with newer"
330
                         " serial for query %s" % salt)
331
    elif new_reply.serial == old_reply.serial:
332
      if new_reply.answer != old_reply.answer:
333
        self._logger.warning("Got incoherent answers for query %s"
334
                             " (serial: %s)" % (salt, new_reply.serial))
335
      else:
336
        self._logger.debug("Filtering confirming answer, with same"
337
                           " serial for query %s" % salt)
338
    else:
339
      self._logger.debug("Filtering outdated answer for query %s"
340
                         " serial: (%d < %d)" % (salt, old_reply.serial,
341
                                                 new_reply.serial))
342

    
343
  def _HandleExpire(self, up):
344
    # if we have no answer we have received none, before the expiration.
345
    if up.salt in self._answers:
346
      del self._answers[up.salt]
347

    
348
  def _HandleReply(self, up):
349
    """Handle a single confd reply, and decide whether to filter it.
350

351
    @rtype: boolean
352
    @return: True if the reply should be filtered, False if it should be passed
353
             on to the up-callback
354

355
    """
356
    filter_upcall = False
357
    salt = up.salt
358
    if salt not in self._answers:
359
      # first answer for a query (don't filter, and record)
360
      self._answers[salt] = up.server_reply
361
    elif up.server_reply.serial > self._answers[salt].serial:
362
      # newer answer (record, and compare contents)
363
      old_answer = self._answers[salt]
364
      self._answers[salt] = up.server_reply
365
      if up.server_reply.answer == old_answer.answer:
366
        # same content (filter) (version upgrade was unrelated)
367
        filter_upcall = True
368
        self._LogFilter(salt, up.server_reply, old_answer)
369
      # else: different content, pass up a second answer
370
    else:
371
      # older or same-version answer (duplicate or outdated, filter)
372
      filter_upcall = True
373
      self._LogFilter(salt, up.server_reply, self._answers[salt])
374

    
375
    return filter_upcall
376

    
377
  def __call__(self, up):
378
    """Filtering callback
379

380
    @type up: L{ConfdUpcallPayload}
381
    @param up: upper callback
382

383
    """
384
    filter_upcall = False
385
    if up.type == UPCALL_REPLY:
386
      filter_upcall = self._HandleReply(up)
387
    elif up.type == UPCALL_EXPIRE:
388
      self._HandleExpire(up)
389

    
390
    if not filter_upcall:
391
      self._callback(up)
392