Statistics
| Branch: | Tag: | Revision:

root / lib / confd / client.py @ 9b94905f

History | View | Annotate | Download (12.7 kB)

1
#
2
#
3

    
4
# Copyright (C) 2009 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Ganeti confd client
23

24
Clients can use the confd client library to send requests to a group of master
25
candidates running confd. The expected usage is through the asyncore framework,
26
by sending queries, and asynchronously receiving replies through a callback.
27

28
This way the client library doesn't ever need to "wait" on a particular answer,
29
and can proceed even if some udp packets are lost. It's up to the user to
30
reschedule queries if they haven't received responses and they need them.
31

32
Example usage::
33

34
  client = ConfdClient(...) # includes callback specification
35
  req = confd_client.ConfdClientRequest(type=constants.CONFD_REQ_PING)
36
  client.SendRequest(req)
37
  # then make sure your client calls asyncore.loop() or daemon.Mainloop.Run()
38
  # ... wait ...
39
  # And your callback will be called by asyncore, when your query gets a
40
  # response, or when it expires.
41

42
You can use the provided ConfdFilterCallback to act as a filter, only passing
43
"newer" answer to your callback, and filtering out outdated ones, or ones
44
confirming what you already got.
45

46
"""
47

    
48
import socket
49
import time
50
import random
51

    
52
from ganeti import utils
53
from ganeti import constants
54
from ganeti import objects
55
from ganeti import serializer
56
from ganeti import daemon # contains AsyncUDPSocket
57
from ganeti import errors
58
from ganeti import confd
59

    
60

    
61
class ConfdAsyncUDPClient(daemon.AsyncUDPSocket):
62
  """Confd udp asyncore client
63

64
  This is kept separate from the main ConfdClient to make sure it's easy to
65
  implement a non-asyncore based client library.
66

67
  """
68
  def __init__(self, client):
69
    """Constructor for ConfdAsyncUDPClient
70

71
    @type client: L{ConfdClient}
72
    @param client: client library, to pass the datagrams to
73

74
    """
75
    daemon.AsyncUDPSocket.__init__(self)
76
    self.client = client
77

    
78
  # this method is overriding a daemon.AsyncUDPSocket method
79
  def handle_datagram(self, payload, ip, port):
80
    self.client.HandleResponse(payload, ip, port)
81

    
82

    
83
class ConfdClient:
84
  """Send queries to confd, and get back answers.
85

86
  Since the confd model works by querying multiple master candidates, and
87
  getting back answers, this is an asynchronous library. It can either work
88
  through asyncore or with your own handling.
89

90
  """
91
  def __init__(self, hmac_key, peers, callback, port=None, logger=None):
92
    """Constructor for ConfdClient
93

94
    @type hmac_key: string
95
    @param hmac_key: hmac key to talk to confd
96
    @type peers: list
97
    @param peers: list of peer nodes
98
    @type callback: f(L{ConfdUpcallPayload})
99
    @param callback: function to call when getting answers
100
    @type port: integer
101
    @keyword port: confd port (default: use GetDaemonPort)
102
    @type logger: logging.Logger
103
    @keyword logger: optional logger for internal conditions
104

105
    """
106
    if not callable(callback):
107
      raise errors.ProgrammerError("callback must be callable")
108

    
109
    self.UpdatePeerList(peers)
110
    self._hmac_key = hmac_key
111
    self._socket = ConfdAsyncUDPClient(self)
112
    self._callback = callback
113
    self._confd_port = port
114
    self._logger = logger
115
    self._requests = {}
116
    self._expire_requests = []
117

    
118
    if self._confd_port is None:
119
      self._confd_port = utils.GetDaemonPort(constants.CONFD)
120

    
121
  def UpdatePeerList(self, peers):
122
    """Update the list of peers
123

124
    @type peers: list
125
    @param peers: list of peer nodes
126

127
    """
128
    if not isinstance(peers, list):
129
      raise errors.ProgrammerError("peers must be a list")
130
    self._peers = peers
131

    
132
  def _PackRequest(self, request, now=None):
133
    """Prepare a request to be sent on the wire.
134

135
    This function puts a proper salt in a confd request, puts the proper salt,
136
    and adds the correct magic number.
137

138
    """
139
    if now is None:
140
      now = time.time()
141
    tstamp = '%d' % now
142
    req = serializer.DumpSignedJson(request.ToDict(), self._hmac_key, tstamp)
143
    return confd.PackMagic(req)
144

    
145
  def _UnpackReply(self, payload):
146
    in_payload = confd.UnpackMagic(payload)
147
    (dict_answer, salt) = serializer.LoadSignedJson(in_payload, self._hmac_key)
148
    answer = objects.ConfdReply.FromDict(dict_answer)
149
    return answer, salt
150

    
151
  def ExpireRequests(self):
152
    """Delete all the expired requests.
153

154
    """
155
    now = time.time()
156
    while self._expire_requests:
157
      expire_time, rsalt = self._expire_requests[0]
158
      if now >= expire_time:
159
        self._expire_requests.pop(0)
160
        (request, args) = self._requests[rsalt]
161
        del self._requests[rsalt]
162
        client_reply = ConfdUpcallPayload(salt=rsalt,
163
                                          type=UPCALL_EXPIRE,
164
                                          orig_request=request,
165
                                          extra_args=args,
166
                                          client=self,
167
                                          )
168
        self._callback(client_reply)
169
      else:
170
        break
171

    
172
  def SendRequest(self, request, args=None, coverage=None):
173
    """Send a confd request to some MCs
174

175
    @type request: L{objects.ConfdRequest}
176
    @param request: the request to send
177
    @type args: tuple
178
    @keyword args: additional callback arguments
179
    @type coverage: integer
180
    @keyword coverage: number of remote nodes to contact
181

182
    """
183
    if coverage is None:
184
      coverage = min(len(self._peers), constants.CONFD_DEFAULT_REQ_COVERAGE)
185

    
186
    if coverage > len(self._peers):
187
      raise errors.ConfdClientError("Not enough MCs known to provide the"
188
                                    " desired coverage")
189

    
190
    if not request.rsalt:
191
      raise errors.ConfdClientError("Missing request rsalt")
192

    
193
    self.ExpireRequests()
194
    if request.rsalt in self._requests:
195
      raise errors.ConfdClientError("Duplicate request rsalt")
196

    
197
    if request.type not in constants.CONFD_REQS:
198
      raise errors.ConfdClientError("Invalid request type")
199

    
200
    random.shuffle(self._peers)
201
    targets = self._peers[:coverage]
202

    
203
    now = time.time()
204
    payload = self._PackRequest(request, now=now)
205

    
206
    for target in targets:
207
      try:
208
        self._socket.enqueue_send(target, self._confd_port, payload)
209
      except errors.UdpDataSizeError:
210
        raise errors.ConfdClientError("Request too big")
211

    
212
    self._requests[request.rsalt] = (request, args)
213
    expire_time = now + constants.CONFD_CLIENT_EXPIRE_TIMEOUT
214
    self._expire_requests.append((expire_time, request.rsalt))
215

    
216
  def HandleResponse(self, payload, ip, port):
217
    """Asynchronous handler for a confd reply
218

219
    Call the relevant callback associated to the current request.
220

221
    """
222
    try:
223
      try:
224
        answer, salt = self._UnpackReply(payload)
225
      except (errors.SignatureError, errors.ConfdMagicError), err:
226
        if self._logger:
227
          self._logger.debug("Discarding broken package: %s" % err)
228
        return
229

    
230
      try:
231
        (request, args) = self._requests[salt]
232
      except KeyError:
233
        if self._logger:
234
          self._logger.debug("Discarding unknown (expired?) reply: %s" % err)
235
        return
236

    
237
      client_reply = ConfdUpcallPayload(salt=salt,
238
                                        type=UPCALL_REPLY,
239
                                        server_reply=answer,
240
                                        orig_request=request,
241
                                        server_ip=ip,
242
                                        server_port=port,
243
                                        extra_args=args,
244
                                        client=self,
245
                                       )
246
      self._callback(client_reply)
247

    
248
    finally:
249
      self.ExpireRequests()
250

    
251

    
252
# UPCALL_REPLY: server reply upcall
253
# has all ConfdUpcallPayload fields populated
254
UPCALL_REPLY = 1
255
# UPCALL_EXPIRE: internal library request expire
256
# has only salt, type, orig_request and extra_args
257
UPCALL_EXPIRE = 2
258
CONFD_UPCALL_TYPES = frozenset([
259
  UPCALL_REPLY,
260
  UPCALL_EXPIRE,
261
  ])
262

    
263

    
264
class ConfdUpcallPayload(objects.ConfigObject):
265
  """Callback argument for confd replies
266

267
  @type salt: string
268
  @ivar salt: salt associated with the query
269
  @type type: one of confd.client.CONFD_UPCALL_TYPES
270
  @ivar type: upcall type (server reply, expired request, ...)
271
  @type orig_request: L{objects.ConfdRequest}
272
  @ivar orig_request: original request
273
  @type server_reply: L{objects.ConfdReply}
274
  @ivar server_reply: server reply
275
  @type server_ip: string
276
  @ivar server_ip: answering server ip address
277
  @type server_port: int
278
  @ivar server_port: answering server port
279
  @type extra_args: any
280
  @ivar extra_args: 'args' argument of the SendRequest function
281
  @type client: L{ConfdClient}
282
  @ivar client: current confd client instance
283

284
  """
285
  __slots__ = [
286
    "salt",
287
    "type",
288
    "orig_request",
289
    "server_reply",
290
    "server_ip",
291
    "server_port",
292
    "extra_args",
293
    "client",
294
    ]
295

    
296

    
297
class ConfdClientRequest(objects.ConfdRequest):
298
  """This is the client-side version of ConfdRequest.
299

300
  This version of the class helps creating requests, on the client side, by
301
  filling in some default values.
302

303
  """
304
  def __init__(self, **kwargs):
305
    objects.ConfdRequest.__init__(self, **kwargs)
306
    if not self.rsalt:
307
      self.rsalt = utils.NewUUID()
308
    if not self.protocol:
309
      self.protocol = constants.CONFD_PROTOCOL_VERSION
310
    if self.type not in constants.CONFD_REQS:
311
      raise errors.ConfdClientError("Invalid request type")
312

    
313

    
314
class ConfdFilterCallback:
315
  """Callback that calls another callback, but filters duplicate results.
316

317
  """
318
  def __init__(self, callback, logger=None):
319
    """Constructor for ConfdFilterCallback
320

321
    @type callback: f(L{ConfdUpcallPayload})
322
    @param callback: function to call when getting answers
323
    @type logger: logging.Logger
324
    @keyword logger: optional logger for internal conditions
325

326
    """
327
    if not callable(callback):
328
      raise errors.ProgrammerError("callback must be callable")
329

    
330
    self._callback = callback
331
    self._logger = logger
332
    # answers contains a dict of salt -> answer
333
    self._answers = {}
334

    
335
  def _LogFilter(self, salt, new_reply, old_reply):
336
    if not self._logger:
337
      return
338

    
339
    if new_reply.serial > old_reply.serial:
340
      self._logger.debug("Filtering confirming answer, with newer"
341
                         " serial for query %s" % salt)
342
    elif new_reply.serial == old_reply.serial:
343
      if new_reply.answer != old_reply.answer:
344
        self._logger.warning("Got incoherent answers for query %s"
345
                             " (serial: %s)" % (salt, new_reply.serial))
346
      else:
347
        self._logger.debug("Filtering confirming answer, with same"
348
                           " serial for query %s" % salt)
349
    else:
350
      self._logger.debug("Filtering outdated answer for query %s"
351
                         " serial: (%d < %d)" % (salt, old_reply.serial,
352
                                                 new_reply.serial))
353

    
354
  def _HandleExpire(self, up):
355
    # if we have no answer we have received none, before the expiration.
356
    if up.salt in self._answers:
357
      del self._answers[up.salt]
358

    
359
  def _HandleReply(self, up):
360
    """Handle a single confd reply, and decide whether to filter it.
361

362
    @rtype: boolean
363
    @return: True if the reply should be filtered, False if it should be passed
364
             on to the up-callback
365

366
    """
367
    filter_upcall = False
368
    salt = up.salt
369
    if salt not in self._answers:
370
      # first answer for a query (don't filter, and record)
371
      self._answers[salt] = up.server_reply
372
    elif up.server_reply.serial > self._answers[salt].serial:
373
      # newer answer (record, and compare contents)
374
      old_answer = self._answers[salt]
375
      self._answers[salt] = up.server_reply
376
      if up.server_reply.answer == old_answer.answer:
377
        # same content (filter) (version upgrade was unrelated)
378
        filter_upcall = True
379
        self._LogFilter(salt, up.server_reply, old_answer)
380
      # else: different content, pass up a second answer
381
    else:
382
      # older or same-version answer (duplicate or outdated, filter)
383
      filter_upcall = True
384
      self._LogFilter(salt, up.server_reply, self._answers[salt])
385

    
386
    return filter_upcall
387

    
388
  def __call__(self, up):
389
    """Filtering callback
390

391
    @type up: L{ConfdUpcallPayload}
392
    @param up: upper callback
393

394
    """
395
    filter_upcall = False
396
    if up.type == UPCALL_REPLY:
397
      filter_upcall = self._HandleReply(up)
398
    elif up.type == UPCALL_EXPIRE:
399
      self._HandleExpire(up)
400

    
401
    if not filter_upcall:
402
      self._callback(up)
403