Statistics
| Branch: | Tag: | Revision:

root / lib / confd / client.py @ 90469357

History | View | Annotate | Download (6.2 kB)

1
#
2
#
3

    
4
# Copyright (C) 2009 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Ganeti confd client
23

24
"""
25
import socket
26
import time
27
import random
28

    
29
from ganeti import utils
30
from ganeti import constants
31
from ganeti import objects
32
from ganeti import serializer
33
from ganeti import daemon # contains AsyncUDPSocket
34
from ganeti import errors
35
from ganeti import confd
36

    
37

    
38
class ConfdAsyncUDPClient(daemon.AsyncUDPSocket):
39
  """Confd udp asyncore client
40

41
  This is kept separate from the main ConfdClient to make sure it's easy to
42
  implement a non-asyncore based client library.
43

44
  """
45
  def __init__(self, client):
46
    """Constructor for ConfdAsyncUDPClient
47

48
    @type client: L{ConfdClient}
49
    @param client: client library, to pass the datagrams to
50

51
    """
52
    daemon.AsyncUDPSocket.__init__(self)
53
    self.client = client
54

    
55
  # this method is overriding a daemon.AsyncUDPSocket method
56
  def handle_datagram(self, payload, ip, port):
57
    self.client.HandleResponse(payload, ip, port)
58

    
59

    
60
class ConfdClient:
61
  """Send queries to confd, and get back answers.
62

63
  Since the confd model works by querying multiple master candidates, and
64
  getting back answers, this is an asynchronous library. It can either work
65
  through asyncore or with your own handling.
66

67
  """
68
  def __init__(self, hmac_key, peers):
69
    """Constructor for ConfdClient
70

71
    @type hmac_key: string
72
    @param hmac_key: hmac key to talk to confd
73
    @type peers: list
74
    @param peers: list of peer nodes
75

76
    """
77
    if not isinstance(peers, list):
78
      raise errors.ProgrammerError("peers must be a list")
79

    
80
    self._peers = peers
81
    self._hmac_key = hmac_key
82
    self._socket = ConfdAsyncUDPClient(self)
83
    self._callbacks = {}
84
    self._expire_callbacks = []
85
    self._confd_port = utils.GetDaemonPort(constants.CONFD)
86

    
87
  def _PackRequest(self, request, now=None):
88
    """Prepare a request to be sent on the wire.
89

90
    This function puts a proper salt in a confd request, puts the proper salt,
91
    and adds the correct magic number.
92

93
    """
94
    if now is None:
95
      now = time.time()
96
    tstamp = '%d' % now
97
    req = serializer.DumpSignedJson(request.ToDict(), self._hmac_key, tstamp)
98
    return confd.PackMagic(req)
99

    
100
  def _UnpackReply(self, payload):
101
    in_payload = confd.UnpackMagic(payload)
102
    (answer, salt) = serializer.LoadSignedJson(in_payload, self._hmac_key)
103
    return answer, salt
104

    
105
  def _ExpireCallbacks(self):
106
    """Delete all the expired callbacks.
107

108
    """
109
    now = time.time()
110
    while self._expire_callbacks:
111
      expire_time, rsalt = self._expire_callbacks[0]
112
      if now >= expire_time:
113
        self._expire_callbacks.pop()
114
        del self._callbacks[rsalt]
115
      else:
116
        break
117

    
118
  def SendRequest(self, request, callback, args=None, coverage=None):
119
    """Send a confd request to some MCs
120

121
    @type request: L{objects.ConfdRequest}
122
    @param request: the request to send
123
    @type callback: f(answer, req_type, req_query, salt, ip, port, args)
124
    @param callback: answer callback
125
    @type args: tuple
126
    @keyword args: additional callback arguments
127
    @type coverage: integer
128
    @keyword coverage: number of remote nodes to contact
129

130
    """
131
    if coverage is None:
132
      coverage = min(len(self._peers), constants.CONFD_DEFAULT_REQ_COVERAGE)
133

    
134
    if not callable(callback):
135
      raise errors.ConfdClientError("callback must be callable")
136

    
137
    if coverage > len(self._peers):
138
      raise errors.ConfdClientError("Not enough MCs known to provide the"
139
                                    " desired coverage")
140

    
141
    if not request.rsalt:
142
      raise errors.ConfdClientError("Missing request rsalt")
143

    
144
    self._ExpireCallbacks()
145
    if request.rsalt in self._callbacks:
146
      raise errors.ConfdClientError("Duplicate request rsalt")
147

    
148
    if request.type not in constants.CONFD_REQS:
149
      raise errors.ConfdClientError("Invalid request type")
150

    
151
    random.shuffle(self._peers)
152
    targets = self._peers[:coverage]
153

    
154
    now = time.time()
155
    payload = self._PackRequest(request, now=now)
156

    
157
    for target in targets:
158
      try:
159
        self._socket.enqueue_send(target, self._confd_port, payload)
160
      except errors.UdpDataSizeError:
161
        raise errors.ConfdClientError("Request too big")
162

    
163
    self._callbacks[request.rsalt] = (callback, request.type,
164
                                      request.query, args)
165
    expire_time = now + constants.CONFD_CLIENT_EXPIRE_TIMEOUT
166
    self._expire_callbacks.append((expire_time, request.rsalt))
167

    
168
  def HandleResponse(self, payload, ip, port):
169
    """Asynchronous handler for a confd reply
170

171
    Call the relevant callback associated to the current request.
172

173
    """
174
    try:
175
      try:
176
        answer, salt = self._UnpackReply(payload)
177
      except (errors.SignatureError, errors.ConfdMagicError):
178
        return
179

    
180
      try:
181
        (callback, type, query, args) = self._callbacks[salt]
182
      except KeyError:
183
        # If the salt is unkown the answer is probably a replay of an old
184
        # expired query. Ignoring it.
185
        pass
186
      else:
187
        callback(answer, type, query, salt, ip, port, args)
188

    
189
    finally:
190
      self._ExpireCallbacks()
191

    
192

    
193
class ConfdClientRequest(objects.ConfdRequest):
194
  """This is the client-side version of ConfdRequest.
195

196
  This version of the class helps creating requests, on the client side, by
197
  filling in some default values.
198

199
  """
200
  def __init__(self, **kwargs):
201
    objects.ConfdRequest.__init__(self, **kwargs)
202
    if not self.rsalt:
203
      self.rsalt = utils.NewUUID()
204
    if not self.protocol:
205
      self.protocol = constants.CONFD_PROTOCOL_VERSION
206
    if self.type not in constants.CONFD_REQS:
207
      raise errors.ConfdClientError("Invalid request type")
208