Statistics
| Branch: | Tag: | Revision:

root / lib / confd / client.py @ 033d58b0

History | View | Annotate | Download (8.3 kB)

1
#
2
#
3

    
4
# Copyright (C) 2009 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Ganeti confd client
23

24
"""
25
import socket
26
import time
27
import random
28

    
29
from ganeti import utils
30
from ganeti import constants
31
from ganeti import objects
32
from ganeti import serializer
33
from ganeti import daemon # contains AsyncUDPSocket
34
from ganeti import errors
35
from ganeti import confd
36

    
37

    
38
class ConfdAsyncUDPClient(daemon.AsyncUDPSocket):
39
  """Confd udp asyncore client
40

41
  This is kept separate from the main ConfdClient to make sure it's easy to
42
  implement a non-asyncore based client library.
43

44
  """
45
  def __init__(self, client):
46
    """Constructor for ConfdAsyncUDPClient
47

48
    @type client: L{ConfdClient}
49
    @param client: client library, to pass the datagrams to
50

51
    """
52
    daemon.AsyncUDPSocket.__init__(self)
53
    self.client = client
54

    
55
  # this method is overriding a daemon.AsyncUDPSocket method
56
  def handle_datagram(self, payload, ip, port):
57
    self.client.HandleResponse(payload, ip, port)
58

    
59

    
60
class ConfdClient:
61
  """Send queries to confd, and get back answers.
62

63
  Since the confd model works by querying multiple master candidates, and
64
  getting back answers, this is an asynchronous library. It can either work
65
  through asyncore or with your own handling.
66

67
  """
68
  def __init__(self, hmac_key, peers, callback, port=None, logger=None):
69
    """Constructor for ConfdClient
70

71
    @type hmac_key: string
72
    @param hmac_key: hmac key to talk to confd
73
    @type peers: list
74
    @param peers: list of peer nodes
75
    @type callback: f(L{ConfdUpcallPayload})
76
    @param callback: function to call when getting answers
77
    @type port: integer
78
    @keyword port: confd port (default: use GetDaemonPort)
79
    @type logger: L{logging.Logger}
80
    @keyword logger: optional logger for internal conditions
81

82
    """
83
    if not isinstance(peers, list):
84
      raise errors.ProgrammerError("peers must be a list")
85
    if not callable(callback):
86
      raise errors.ProgrammerError("callback must be callable")
87

    
88
    self._peers = peers
89
    self._hmac_key = hmac_key
90
    self._socket = ConfdAsyncUDPClient(self)
91
    self._callback = callback
92
    self._confd_port = port
93
    self._logger = logger
94
    self._requests = {}
95
    self._expire_requests = []
96

    
97
    if self._confd_port is None:
98
      self._confd_port = utils.GetDaemonPort(constants.CONFD)
99

    
100
  def _PackRequest(self, request, now=None):
101
    """Prepare a request to be sent on the wire.
102

103
    This function puts a proper salt in a confd request, puts the proper salt,
104
    and adds the correct magic number.
105

106
    """
107
    if now is None:
108
      now = time.time()
109
    tstamp = '%d' % now
110
    req = serializer.DumpSignedJson(request.ToDict(), self._hmac_key, tstamp)
111
    return confd.PackMagic(req)
112

    
113
  def _UnpackReply(self, payload):
114
    in_payload = confd.UnpackMagic(payload)
115
    (dict_answer, salt) = serializer.LoadSignedJson(in_payload, self._hmac_key)
116
    answer = objects.ConfdReply.FromDict(dict_answer)
117
    return answer, salt
118

    
119
  def ExpireRequests(self):
120
    """Delete all the expired requests.
121

122
    """
123
    now = time.time()
124
    while self._expire_requests:
125
      expire_time, rsalt = self._expire_requests[0]
126
      if now >= expire_time:
127
        self._expire_requests.pop(0)
128
        (request, args) = self._requests[rsalt]
129
        del self._requests[rsalt]
130
        client_reply = ConfdUpcallPayload(salt=rsalt,
131
                                          type=UPCALL_EXPIRE,
132
                                          orig_request=request,
133
                                          extra_args=args)
134
        self._callback(client_reply)
135
      else:
136
        break
137

    
138
  def SendRequest(self, request, args=None, coverage=None):
139
    """Send a confd request to some MCs
140

141
    @type request: L{objects.ConfdRequest}
142
    @param request: the request to send
143
    @type args: tuple
144
    @keyword args: additional callback arguments
145
    @type coverage: integer
146
    @keyword coverage: number of remote nodes to contact
147

148
    """
149
    if coverage is None:
150
      coverage = min(len(self._peers), constants.CONFD_DEFAULT_REQ_COVERAGE)
151

    
152
    if coverage > len(self._peers):
153
      raise errors.ConfdClientError("Not enough MCs known to provide the"
154
                                    " desired coverage")
155

    
156
    if not request.rsalt:
157
      raise errors.ConfdClientError("Missing request rsalt")
158

    
159
    self.ExpireRequests()
160
    if request.rsalt in self._requests:
161
      raise errors.ConfdClientError("Duplicate request rsalt")
162

    
163
    if request.type not in constants.CONFD_REQS:
164
      raise errors.ConfdClientError("Invalid request type")
165

    
166
    random.shuffle(self._peers)
167
    targets = self._peers[:coverage]
168

    
169
    now = time.time()
170
    payload = self._PackRequest(request, now=now)
171

    
172
    for target in targets:
173
      try:
174
        self._socket.enqueue_send(target, self._confd_port, payload)
175
      except errors.UdpDataSizeError:
176
        raise errors.ConfdClientError("Request too big")
177

    
178
    self._requests[request.rsalt] = (request, args)
179
    expire_time = now + constants.CONFD_CLIENT_EXPIRE_TIMEOUT
180
    self._expire_requests.append((expire_time, request.rsalt))
181

    
182
  def HandleResponse(self, payload, ip, port):
183
    """Asynchronous handler for a confd reply
184

185
    Call the relevant callback associated to the current request.
186

187
    """
188
    try:
189
      try:
190
        answer, salt = self._UnpackReply(payload)
191
      except (errors.SignatureError, errors.ConfdMagicError), err:
192
        if self._logger:
193
          self._logger.debug("Discarding broken package: %s" % err)
194
        return
195

    
196
      try:
197
        (request, args) = self._requests[salt]
198
      except KeyError:
199
        if self._logger:
200
          self._logger.debug("Discarding unknown (expired?) reply: %s" % err)
201
        return
202

    
203
      client_reply = ConfdUpcallPayload(salt=salt,
204
                                        type=UPCALL_REPLY,
205
                                        server_reply=answer,
206
                                        orig_request=request,
207
                                        server_ip=ip,
208
                                        server_port=port,
209
                                        extra_args=args)
210
      self._callback(client_reply)
211

    
212
    finally:
213
      self.ExpireRequests()
214

    
215

    
216
# UPCALL_REPLY: server reply upcall
217
# has all ConfdUpcallPayload fields populated
218
UPCALL_REPLY = 1
219
# UPCALL_EXPIRE: internal library request expire
220
# has only salt, type, orig_request and extra_args
221
UPCALL_EXPIRE = 2
222
CONFD_UPCALL_TYPES = frozenset([
223
  UPCALL_REPLY,
224
  UPCALL_EXPIRE,
225
  ])
226

    
227

    
228
class ConfdUpcallPayload(objects.ConfigObject):
229
  """Callback argument for confd replies
230

231
  @type salt: string
232
  @ivar salt: salt associated with the query
233
  @type type: one of confd.client.CONFD_UPCALL_TYPES
234
  @ivar type: upcall type (server reply, expired request, ...)
235
  @type orig_request: L{objects.ConfdRequest}
236
  @ivar orig_request: original request
237
  @type server_reply: L{objects.ConfdReply}
238
  @ivar server_reply: server reply
239
  @type server_ip: string
240
  @ivar server_ip: answering server ip address
241
  @type server_port: int
242
  @ivar server_port: answering server port
243
  @type extra_args: any
244
  @ivar extra_args: 'args' argument of the SendRequest function
245

246
  """
247
  __slots__ = [
248
    "salt",
249
    "type",
250
    "orig_request",
251
    "server_reply",
252
    "server_ip",
253
    "server_port",
254
    "extra_args",
255
    ]
256

    
257

    
258
class ConfdClientRequest(objects.ConfdRequest):
259
  """This is the client-side version of ConfdRequest.
260

261
  This version of the class helps creating requests, on the client side, by
262
  filling in some default values.
263

264
  """
265
  def __init__(self, **kwargs):
266
    objects.ConfdRequest.__init__(self, **kwargs)
267
    if not self.rsalt:
268
      self.rsalt = utils.NewUUID()
269
    if not self.protocol:
270
      self.protocol = constants.CONFD_PROTOCOL_VERSION
271
    if self.type not in constants.CONFD_REQS:
272
      raise errors.ConfdClientError("Invalid request type")
273