Revision e4ccf6cd

b/Makefile.am
130 130

  
131 131
confd_PYTHON = \
132 132
	lib/confd/__init__.py \
133
	lib/confd/client.py \
133 134
	lib/confd/server.py \
134 135
	lib/confd/querylib.py
135 136

  
b/lib/confd/client.py
1
#
2
#
3

  
4
# Copyright (C) 2009 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

  
21

  
22
"""Ganeti confd client
23

  
24
"""
25
import socket
26
import time
27
import random
28

  
29
from ganeti import utils
30
from ganeti import constants
31
from ganeti import objects
32
from ganeti import serializer
33
from ganeti import daemon # contains AsyncUDPSocket
34
from ganeti import errors
35
from ganeti import confd
36

  
37

  
38
class ConfdAsyncUDPClient(daemon.AsyncUDPSocket):
39
  """Confd udp asyncore client
40

  
41
  This is kept separate from the main ConfdClient to make sure it's easy to
42
  implement a non-asyncore based client library.
43

  
44
  """
45
  def __init__(self, client):
46
    """Constructor for ConfdAsyncUDPClient
47

  
48
    @type client: L{ConfdClient}
49
    @param client: client library, to pass the datagrams to
50

  
51
    """
52
    daemon.AsyncUDPSocket.__init__(self)
53
    self.client = client
54

  
55
  # this method is overriding a daemon.AsyncUDPSocket method
56
  def handle_datagram(self, payload, ip, port):
57
    self.client.HandleResponse(payload, ip, port)
58

  
59

  
60
class ConfdClient:
61
  """Send queries to confd, and get back answers.
62

  
63
  Since the confd model works by querying multiple master candidates, and
64
  getting back answers, this is an asynchronous library. It can either work
65
  through asyncore or with your own handling.
66

  
67
  """
68
  def __init__(self, hmac_key, peers):
69
    """Constructor for ConfdClient
70

  
71
    @type hmac_key: string
72
    @param hmac_key: hmac key to talk to confd
73
    @type peers: list
74
    @param peers: list of peer nodes
75

  
76
    """
77
    if not isinstance(peers, list):
78
      raise errors.ProgrammerError("peers must be a list")
79

  
80
    self._peers = peers
81
    self._hmac_key = hmac_key
82
    self._socket = ConfdAsyncUDPClient(self)
83
    self._callbacks = {}
84
    self._expire_callbacks = []
85
    self._confd_port = utils.GetDaemonPort(constants.CONFD)
86

  
87
  def _PackRequest(self, request, now=None):
88
    """Prepare a request to be sent on the wire.
89

  
90
    This function puts a proper salt in a confd request, puts the proper salt,
91
    and adds the correct magic number.
92

  
93
    """
94
    if now is None:
95
      now = time.time()
96
    tstamp = '%d' % now
97
    req = serializer.DumpSignedJson(request.ToDict(), self._hmac_key, tstamp)
98
    return confd.PackMagic(req)
99

  
100
  def _UnpackReply(self, payload):
101
    in_payload = confd.UnpackMagic(payload)
102
    (answer, salt) = serializer.LoadSignedJson(in_payload, self._hmac_key)
103
    return answer, salt
104

  
105
  def _ExpireCallbacks(self):
106
    """Delete all the expired callbacks.
107

  
108
    """
109
    now = time.time()
110
    while self._expire_callbacks:
111
      expire_time, rsalt = self._expire_callbacks[0]
112
      if now >= expire_time:
113
        self._expire_callbacks.pop()
114
        del self._callbacks[rsalt]
115
      else:
116
        break
117

  
118
  def SendRequest(self, request, callback, args, coverage=None):
119
    """Send a confd request to some MCs
120

  
121
    @type request: L{objects.ConfdRequest}
122
    @param request: the request to send
123
    @type callback: f(answer, req_type, req_query, salt, ip, port, args)
124
    @param callback: answer callback
125
    @type args: tuple
126
    @param args: additional callback arguments
127
    @type coverage: integer
128
    @keyword coverage: number of remote nodes to contact
129

  
130
    """
131
    if coverage is None:
132
      coverage = min(len(self._peers), constants.CONFD_DEFAULT_REQ_COVERAGE)
133

  
134
    if not callable(callback):
135
      raise errors.ConfdClientError("callback must be callable")
136

  
137
    if coverage > len(self._peers):
138
      raise errors.ConfdClientError("Not enough MCs known to provide the"
139
                                    " desired coverage")
140

  
141
    if not request.rsalt:
142
      raise errors.ConfdClientError("Missing request rsalt")
143

  
144
    self._ExpireCallbacks()
145
    if request.rsalt in self._callbacks:
146
      raise errors.ConfdClientError("Duplicate request rsalt")
147

  
148
    if request.type not in constants.CONFD_REQS:
149
      raise errors.ConfdClientError("Invalid request type")
150

  
151
    random.shuffle(self._peers)
152
    targets = self._peers[:coverage]
153

  
154
    now = time.time()
155
    payload = self._PackRequest(request, now=now)
156

  
157
    for target in targets:
158
      try:
159
        self._socket.enqueue_send(target, self._confd_port, payload)
160
      except errors.UdpDataSizeError:
161
        raise errors.ConfdClientError("Request too big")
162

  
163
    self._callbacks[request.rsalt] = (callback, request.type,
164
                                      request.query, args)
165
    expire_time = now + constants.CONFD_CLIENT_EXPIRE_TIMEOUT
166
    self._expire_callbacks.append((expire_time, request.rsalt))
167

  
168
  def HandleResponse(self, payload, ip, port):
169
    """Asynchronous handler for a confd reply
170

  
171
    Call the relevant callback associated to the current request.
172

  
173
    """
174
    try:
175
      try:
176
        answer, salt = self._UnpackReply(payload)
177
      except (errors.SignatureError, errors.ConfdMagicError):
178
        return
179

  
180
      try:
181
        (callback, type, query, args) = self._callbacks[salt]
182
      except KeyError:
183
        # If the salt is unkown the answer is probably a replay of an old
184
        # expired query. Ignoring it.
185
        pass
186
      else:
187
        callback(answer, type, query, salt, ip, port, args)
188

  
189
    finally:
190
      self._ExpireCallbacks()
191

  
192

  
193
class ConfdClientRequest(objects.ConfdRequest):
194
  """This is the client-side version of ConfdRequest.
195

  
196
  This version of the class helps creating requests, on the client side, by
197
  filling in some default values.
198

  
199
  """
200
  def __init__(self, **kwargs):
201
    objects.ConfdRequest.__init__(self, **kwargs)
202
    if not self.rsalt:
203
      self.rsalt = utils.NewUUID()
204
    if not self.protocol:
205
      self.protocol = constants.CONFD_PROTOCOL_VERSION
206
    if self.type not in constants.CONFD_REQS:
207
      raise errors.ConfdClientError("Invalid request type")
208

  
b/lib/constants.py
688 688
# compressed, or move away from json.
689 689
CONFD_MAGIC_FOURCC = 'plj0'
690 690

  
691
# By default a confd request is sent to the minimum between this number and all
692
# MCs. 6 was chosen because even in the case of a disastrous 50% response rate,
693
# we should have enough answers to be able to compare more than one.
694
CONFD_DEFAULT_REQ_COVERAGE = 6
695

  
696
# Timeout in seconds to expire pending query request in the confd client
697
# library. We don't actually expect any answer more than 10 seconds after we
698
# sent a request.
699
CONFD_CLIENT_EXPIRE_TIMEOUT = 10
700

  
691 701
# Maximum UDP datagram size.
692 702
# On IPv4: 64K - 20 (ip header size) - 8 (udp header size) = 65507
693 703
# On IPv6: 64K - 40 (ip6 header size) - 8 (udp header size) = 65487
b/lib/errors.py
305 305
  """
306 306

  
307 307

  
308
class ConfdClientError(GenericError):
309
  """A magic fourcc error in Ganeti confd.
310

  
311
  Errors in the confd client library.
312

  
313
  """
314

  
315

  
308 316
class UdpDataSizeError(GenericError):
309 317
  """UDP payload too big.
310 318

  

Also available in: Unified diff