Confd client library: enable optional logging
[ganeti-local] / lib / confd / client.py
1 #
2 #
3
4 # Copyright (C) 2009 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Ganeti confd client
23
24 """
25 import socket
26 import time
27 import random
28
29 from ganeti import utils
30 from ganeti import constants
31 from ganeti import objects
32 from ganeti import serializer
33 from ganeti import daemon # contains AsyncUDPSocket
34 from ganeti import errors
35 from ganeti import confd
36
37
38 class ConfdAsyncUDPClient(daemon.AsyncUDPSocket):
39   """Confd udp asyncore client
40
41   This is kept separate from the main ConfdClient to make sure it's easy to
42   implement a non-asyncore based client library.
43
44   """
45   def __init__(self, client):
46     """Constructor for ConfdAsyncUDPClient
47
48     @type client: L{ConfdClient}
49     @param client: client library, to pass the datagrams to
50
51     """
52     daemon.AsyncUDPSocket.__init__(self)
53     self.client = client
54
55   # this method is overriding a daemon.AsyncUDPSocket method
56   def handle_datagram(self, payload, ip, port):
57     self.client.HandleResponse(payload, ip, port)
58
59
60 class ConfdClient:
61   """Send queries to confd, and get back answers.
62
63   Since the confd model works by querying multiple master candidates, and
64   getting back answers, this is an asynchronous library. It can either work
65   through asyncore or with your own handling.
66
67   """
68   def __init__(self, hmac_key, peers, callback, port=None, logger=None):
69     """Constructor for ConfdClient
70
71     @type hmac_key: string
72     @param hmac_key: hmac key to talk to confd
73     @type peers: list
74     @param peers: list of peer nodes
75     @type callback: f(L{ConfdUpcallPayload})
76     @param callback: function to call when getting answers
77     @type port: integer
78     @keyword port: confd port (default: use GetDaemonPort)
79     @type logger: L{logging.Logger}
80     @keyword logger: optional logger for internal conditions
81
82     """
83     if not isinstance(peers, list):
84       raise errors.ProgrammerError("peers must be a list")
85     if not callable(callback):
86       raise errors.ProgrammerError("callback must be callable")
87
88     self._peers = peers
89     self._hmac_key = hmac_key
90     self._socket = ConfdAsyncUDPClient(self)
91     self._callback = callback
92     self._confd_port = port
93     self._logger = logger
94     self._requests = {}
95     self._expire_requests = []
96
97     if self._confd_port is None:
98       self._confd_port = utils.GetDaemonPort(constants.CONFD)
99
100   def _PackRequest(self, request, now=None):
101     """Prepare a request to be sent on the wire.
102
103     This function puts a proper salt in a confd request, puts the proper salt,
104     and adds the correct magic number.
105
106     """
107     if now is None:
108       now = time.time()
109     tstamp = '%d' % now
110     req = serializer.DumpSignedJson(request.ToDict(), self._hmac_key, tstamp)
111     return confd.PackMagic(req)
112
113   def _UnpackReply(self, payload):
114     in_payload = confd.UnpackMagic(payload)
115     (answer, salt) = serializer.LoadSignedJson(in_payload, self._hmac_key)
116     return answer, salt
117
118   def ExpireRequests(self):
119     """Delete all the expired requests.
120
121     """
122     now = time.time()
123     while self._expire_requests:
124       expire_time, rsalt = self._expire_requests[0]
125       if now >= expire_time:
126         self._expire_requests.pop(0)
127         (request, args) = self._requests[rsalt]
128         del self._requests[rsalt]
129         client_reply = ConfdUpcallPayload(salt=rsalt,
130                                           type=UPCALL_EXPIRE,
131                                           orig_request=request,
132                                           extra_args=args)
133         self._callback(client_reply)
134       else:
135         break
136
137   def SendRequest(self, request, args=None, coverage=None):
138     """Send a confd request to some MCs
139
140     @type request: L{objects.ConfdRequest}
141     @param request: the request to send
142     @type args: tuple
143     @keyword args: additional callback arguments
144     @type coverage: integer
145     @keyword coverage: number of remote nodes to contact
146
147     """
148     if coverage is None:
149       coverage = min(len(self._peers), constants.CONFD_DEFAULT_REQ_COVERAGE)
150
151     if coverage > len(self._peers):
152       raise errors.ConfdClientError("Not enough MCs known to provide the"
153                                     " desired coverage")
154
155     if not request.rsalt:
156       raise errors.ConfdClientError("Missing request rsalt")
157
158     self.ExpireRequests()
159     if request.rsalt in self._requests:
160       raise errors.ConfdClientError("Duplicate request rsalt")
161
162     if request.type not in constants.CONFD_REQS:
163       raise errors.ConfdClientError("Invalid request type")
164
165     random.shuffle(self._peers)
166     targets = self._peers[:coverage]
167
168     now = time.time()
169     payload = self._PackRequest(request, now=now)
170
171     for target in targets:
172       try:
173         self._socket.enqueue_send(target, self._confd_port, payload)
174       except errors.UdpDataSizeError:
175         raise errors.ConfdClientError("Request too big")
176
177     self._requests[request.rsalt] = (request, args)
178     expire_time = now + constants.CONFD_CLIENT_EXPIRE_TIMEOUT
179     self._expire_requests.append((expire_time, request.rsalt))
180
181   def HandleResponse(self, payload, ip, port):
182     """Asynchronous handler for a confd reply
183
184     Call the relevant callback associated to the current request.
185
186     """
187     try:
188       try:
189         answer, salt = self._UnpackReply(payload)
190       except (errors.SignatureError, errors.ConfdMagicError), err:
191         if self._logger:
192           self._logger.debug("Discarding broken package: %s" % err)
193         return
194
195       try:
196         (request, args) = self._requests[salt]
197       except KeyError:
198         if self._logger:
199           self._logger.debug("Discarding unknown (expired?) reply: %s" % err)
200         return
201
202       client_reply = ConfdUpcallPayload(salt=salt,
203                                         type=UPCALL_REPLY,
204                                         server_reply=answer,
205                                         orig_request=request,
206                                         server_ip=ip,
207                                         server_port=port,
208                                         extra_args=args)
209       self._callback(client_reply)
210
211     finally:
212       self.ExpireRequests()
213
214
215 # UPCALL_REPLY: server reply upcall
216 # has all ConfdUpcallPayload fields populated
217 UPCALL_REPLY = 1
218 # UPCALL_EXPIRE: internal library request expire
219 # has only salt, type, orig_request and extra_args
220 UPCALL_EXPIRE = 2
221 CONFD_UPCALL_TYPES = frozenset([
222   UPCALL_REPLY,
223   UPCALL_EXPIRE,
224   ])
225
226
227 class ConfdUpcallPayload(objects.ConfigObject):
228   """Callback argument for confd replies
229
230   @type salt: string
231   @ivar salt: salt associated with the query
232   @type type: one of confd.client.CONFD_UPCALL_TYPES
233   @ivar type: upcall type (server reply, expired request, ...)
234   @type orig_request: L{objects.ConfdRequest}
235   @ivar orig_request: original request
236   @type server_reply: L{objects.ConfdReply}
237   @ivar server_reply: server reply
238   @type server_ip: string
239   @ivar server_ip: answering server ip address
240   @type server_port: int
241   @ivar server_port: answering server port
242   @type extra_args: any
243   @ivar extra_args: 'args' argument of the SendRequest function
244
245   """
246   __slots__ = [
247     "salt",
248     "type",
249     "orig_request",
250     "server_reply",
251     "server_ip",
252     "server_port",
253     "extra_args",
254     ]
255
256
257 class ConfdClientRequest(objects.ConfdRequest):
258   """This is the client-side version of ConfdRequest.
259
260   This version of the class helps creating requests, on the client side, by
261   filling in some default values.
262
263   """
264   def __init__(self, **kwargs):
265     objects.ConfdRequest.__init__(self, **kwargs)
266     if not self.rsalt:
267       self.rsalt = utils.NewUUID()
268     if not self.protocol:
269       self.protocol = constants.CONFD_PROTOCOL_VERSION
270     if self.type not in constants.CONFD_REQS:
271       raise errors.ConfdClientError("Invalid request type")
272