Unify the “--backend-parameters” option
[ganeti-local] / lib / confd / client.py
1 #
2 #
3
4 # Copyright (C) 2009 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Ganeti confd client
23
24 """
25 import socket
26 import time
27 import random
28
29 from ganeti import utils
30 from ganeti import constants
31 from ganeti import objects
32 from ganeti import serializer
33 from ganeti import daemon # contains AsyncUDPSocket
34 from ganeti import errors
35 from ganeti import confd
36
37
38 class ConfdAsyncUDPClient(daemon.AsyncUDPSocket):
39   """Confd udp asyncore client
40
41   This is kept separate from the main ConfdClient to make sure it's easy to
42   implement a non-asyncore based client library.
43
44   """
45   def __init__(self, client):
46     """Constructor for ConfdAsyncUDPClient
47
48     @type client: L{ConfdClient}
49     @param client: client library, to pass the datagrams to
50
51     """
52     daemon.AsyncUDPSocket.__init__(self)
53     self.client = client
54
55   # this method is overriding a daemon.AsyncUDPSocket method
56   def handle_datagram(self, payload, ip, port):
57     self.client.HandleResponse(payload, ip, port)
58
59
60 class ConfdClient:
61   """Send queries to confd, and get back answers.
62
63   Since the confd model works by querying multiple master candidates, and
64   getting back answers, this is an asynchronous library. It can either work
65   through asyncore or with your own handling.
66
67   """
68   def __init__(self, hmac_key, peers):
69     """Constructor for ConfdClient
70
71     @type hmac_key: string
72     @param hmac_key: hmac key to talk to confd
73     @type peers: list
74     @param peers: list of peer nodes
75
76     """
77     if not isinstance(peers, list):
78       raise errors.ProgrammerError("peers must be a list")
79
80     self._peers = peers
81     self._hmac_key = hmac_key
82     self._socket = ConfdAsyncUDPClient(self)
83     self._callbacks = {}
84     self._expire_callbacks = []
85     self._confd_port = utils.GetDaemonPort(constants.CONFD)
86
87   def _PackRequest(self, request, now=None):
88     """Prepare a request to be sent on the wire.
89
90     This function puts a proper salt in a confd request, puts the proper salt,
91     and adds the correct magic number.
92
93     """
94     if now is None:
95       now = time.time()
96     tstamp = '%d' % now
97     req = serializer.DumpSignedJson(request.ToDict(), self._hmac_key, tstamp)
98     return confd.PackMagic(req)
99
100   def _UnpackReply(self, payload):
101     in_payload = confd.UnpackMagic(payload)
102     (answer, salt) = serializer.LoadSignedJson(in_payload, self._hmac_key)
103     return answer, salt
104
105   def _ExpireCallbacks(self):
106     """Delete all the expired callbacks.
107
108     """
109     now = time.time()
110     while self._expire_callbacks:
111       expire_time, rsalt = self._expire_callbacks[0]
112       if now >= expire_time:
113         self._expire_callbacks.pop()
114         del self._callbacks[rsalt]
115       else:
116         break
117
118   def SendRequest(self, request, callback, args, coverage=None):
119     """Send a confd request to some MCs
120
121     @type request: L{objects.ConfdRequest}
122     @param request: the request to send
123     @type callback: f(answer, req_type, req_query, salt, ip, port, args)
124     @param callback: answer callback
125     @type args: tuple
126     @param args: additional callback arguments
127     @type coverage: integer
128     @keyword coverage: number of remote nodes to contact
129
130     """
131     if coverage is None:
132       coverage = min(len(self._peers), constants.CONFD_DEFAULT_REQ_COVERAGE)
133
134     if not callable(callback):
135       raise errors.ConfdClientError("callback must be callable")
136
137     if coverage > len(self._peers):
138       raise errors.ConfdClientError("Not enough MCs known to provide the"
139                                     " desired coverage")
140
141     if not request.rsalt:
142       raise errors.ConfdClientError("Missing request rsalt")
143
144     self._ExpireCallbacks()
145     if request.rsalt in self._callbacks:
146       raise errors.ConfdClientError("Duplicate request rsalt")
147
148     if request.type not in constants.CONFD_REQS:
149       raise errors.ConfdClientError("Invalid request type")
150
151     random.shuffle(self._peers)
152     targets = self._peers[:coverage]
153
154     now = time.time()
155     payload = self._PackRequest(request, now=now)
156
157     for target in targets:
158       try:
159         self._socket.enqueue_send(target, self._confd_port, payload)
160       except errors.UdpDataSizeError:
161         raise errors.ConfdClientError("Request too big")
162
163     self._callbacks[request.rsalt] = (callback, request.type,
164                                       request.query, args)
165     expire_time = now + constants.CONFD_CLIENT_EXPIRE_TIMEOUT
166     self._expire_callbacks.append((expire_time, request.rsalt))
167
168   def HandleResponse(self, payload, ip, port):
169     """Asynchronous handler for a confd reply
170
171     Call the relevant callback associated to the current request.
172
173     """
174     try:
175       try:
176         answer, salt = self._UnpackReply(payload)
177       except (errors.SignatureError, errors.ConfdMagicError):
178         return
179
180       try:
181         (callback, type, query, args) = self._callbacks[salt]
182       except KeyError:
183         # If the salt is unkown the answer is probably a replay of an old
184         # expired query. Ignoring it.
185         pass
186       else:
187         callback(answer, type, query, salt, ip, port, args)
188
189     finally:
190       self._ExpireCallbacks()
191
192
193 class ConfdClientRequest(objects.ConfdRequest):
194   """This is the client-side version of ConfdRequest.
195
196   This version of the class helps creating requests, on the client side, by
197   filling in some default values.
198
199   """
200   def __init__(self, **kwargs):
201     objects.ConfdRequest.__init__(self, **kwargs)
202     if not self.rsalt:
203       self.rsalt = utils.NewUUID()
204     if not self.protocol:
205       self.protocol = constants.CONFD_PROTOCOL_VERSION
206     if self.type not in constants.CONFD_REQS:
207       raise errors.ConfdClientError("Invalid request type")
208