Revision 96e03b0b
b/lib/confd/client.py | ||
---|---|---|
65 | 65 |
through asyncore or with your own handling. |
66 | 66 |
|
67 | 67 |
""" |
68 |
def __init__(self, hmac_key, peers): |
|
68 |
def __init__(self, hmac_key, peers, callback):
|
|
69 | 69 |
"""Constructor for ConfdClient |
70 | 70 |
|
71 | 71 |
@type hmac_key: string |
72 | 72 |
@param hmac_key: hmac key to talk to confd |
73 | 73 |
@type peers: list |
74 | 74 |
@param peers: list of peer nodes |
75 |
@type callback: f(L{ConfdUpcallPayload}) |
|
76 |
@param callback: function to call when getting answers |
|
75 | 77 |
|
76 | 78 |
""" |
77 | 79 |
if not isinstance(peers, list): |
78 | 80 |
raise errors.ProgrammerError("peers must be a list") |
81 |
if not callable(callback): |
|
82 |
raise errors.ProgrammerError("callback must be callable") |
|
79 | 83 |
|
80 | 84 |
self._peers = peers |
81 | 85 |
self._hmac_key = hmac_key |
82 | 86 |
self._socket = ConfdAsyncUDPClient(self) |
83 |
self._callbacks = {} |
|
84 |
self._expire_callbacks = [] |
|
87 |
self._callback = callback |
|
88 |
self._requests = {} |
|
89 |
self._expire_requests = [] |
|
85 | 90 |
self._confd_port = utils.GetDaemonPort(constants.CONFD) |
86 | 91 |
|
87 | 92 |
def _PackRequest(self, request, now=None): |
... | ... | |
102 | 107 |
(answer, salt) = serializer.LoadSignedJson(in_payload, self._hmac_key) |
103 | 108 |
return answer, salt |
104 | 109 |
|
105 |
def _ExpireCallbacks(self):
|
|
106 |
"""Delete all the expired callbacks.
|
|
110 |
def ExpireRequests(self):
|
|
111 |
"""Delete all the expired requests.
|
|
107 | 112 |
|
108 | 113 |
""" |
109 | 114 |
now = time.time() |
110 |
while self._expire_callbacks:
|
|
111 |
expire_time, rsalt = self._expire_callbacks[0]
|
|
115 |
while self._expire_requests:
|
|
116 |
expire_time, rsalt = self._expire_requests[0]
|
|
112 | 117 |
if now >= expire_time: |
113 |
self._expire_callbacks.pop() |
|
114 |
del self._callbacks[rsalt] |
|
118 |
self._expire_requests.pop(0) |
|
119 |
(request, args) = self._requests[rsalt] |
|
120 |
del self._requests[rsalt] |
|
121 |
client_reply = ConfdUpcallPayload(salt=rsalt, |
|
122 |
type=UPCALL_EXPIRE, |
|
123 |
orig_request=request, |
|
124 |
extra_args=args) |
|
125 |
self._callback(client_reply) |
|
115 | 126 |
else: |
116 | 127 |
break |
117 | 128 |
|
118 |
def SendRequest(self, request, callback, args=None, coverage=None):
|
|
129 |
def SendRequest(self, request, args=None, coverage=None): |
|
119 | 130 |
"""Send a confd request to some MCs |
120 | 131 |
|
121 | 132 |
@type request: L{objects.ConfdRequest} |
122 | 133 |
@param request: the request to send |
123 |
@type callback: f(answer, req_type, req_query, salt, ip, port, args) |
|
124 |
@param callback: answer callback |
|
125 | 134 |
@type args: tuple |
126 | 135 |
@keyword args: additional callback arguments |
127 | 136 |
@type coverage: integer |
... | ... | |
131 | 140 |
if coverage is None: |
132 | 141 |
coverage = min(len(self._peers), constants.CONFD_DEFAULT_REQ_COVERAGE) |
133 | 142 |
|
134 |
if not callable(callback): |
|
135 |
raise errors.ConfdClientError("callback must be callable") |
|
136 |
|
|
137 | 143 |
if coverage > len(self._peers): |
138 | 144 |
raise errors.ConfdClientError("Not enough MCs known to provide the" |
139 | 145 |
" desired coverage") |
... | ... | |
141 | 147 |
if not request.rsalt: |
142 | 148 |
raise errors.ConfdClientError("Missing request rsalt") |
143 | 149 |
|
144 |
self._ExpireCallbacks()
|
|
145 |
if request.rsalt in self._callbacks:
|
|
150 |
self.ExpireRequests()
|
|
151 |
if request.rsalt in self._requests:
|
|
146 | 152 |
raise errors.ConfdClientError("Duplicate request rsalt") |
147 | 153 |
|
148 | 154 |
if request.type not in constants.CONFD_REQS: |
... | ... | |
160 | 166 |
except errors.UdpDataSizeError: |
161 | 167 |
raise errors.ConfdClientError("Request too big") |
162 | 168 |
|
163 |
self._callbacks[request.rsalt] = (callback, request.type, |
|
164 |
request.query, args) |
|
169 |
self._requests[request.rsalt] = (request, args) |
|
165 | 170 |
expire_time = now + constants.CONFD_CLIENT_EXPIRE_TIMEOUT |
166 |
self._expire_callbacks.append((expire_time, request.rsalt))
|
|
171 |
self._expire_requests.append((expire_time, request.rsalt))
|
|
167 | 172 |
|
168 | 173 |
def HandleResponse(self, payload, ip, port): |
169 | 174 |
"""Asynchronous handler for a confd reply |
... | ... | |
178 | 183 |
return |
179 | 184 |
|
180 | 185 |
try: |
181 |
(callback, type, query, args) = self._callbacks[salt]
|
|
186 |
(request, args) = self._requests[salt]
|
|
182 | 187 |
except KeyError: |
183 | 188 |
# If the salt is unkown the answer is probably a replay of an old |
184 | 189 |
# expired query. Ignoring it. |
185 |
pass |
|
186 |
else: |
|
187 |
callback(answer, type, query, salt, ip, port, args) |
|
190 |
return |
|
191 |
|
|
192 |
client_reply = ConfdUpcallPayload(salt=salt, |
|
193 |
type=UPCALL_REPLY, |
|
194 |
server_reply=answer, |
|
195 |
orig_request=request, |
|
196 |
server_ip=ip, |
|
197 |
server_port=port, |
|
198 |
extra_args=args) |
|
199 |
self._callback(client_reply) |
|
188 | 200 |
|
189 | 201 |
finally: |
190 |
self._ExpireCallbacks() |
|
202 |
self.ExpireRequests() |
|
203 |
|
|
204 |
|
|
205 |
# UPCALL_REPLY: server reply upcall |
|
206 |
# has all ConfdUpcallPayload fields populated |
|
207 |
UPCALL_REPLY = 1 |
|
208 |
# UPCALL_EXPIRE: internal library request expire |
|
209 |
# has only salt, type, orig_request and extra_args |
|
210 |
UPCALL_EXPIRE = 2 |
|
211 |
CONFD_UPCALL_TYPES = frozenset([ |
|
212 |
UPCALL_REPLY, |
|
213 |
UPCALL_EXPIRE, |
|
214 |
]) |
|
215 |
|
|
216 |
|
|
217 |
class ConfdUpcallPayload(objects.ConfigObject): |
|
218 |
"""Callback argument for confd replies |
|
219 |
|
|
220 |
@type salt: string |
|
221 |
@ivar salt: salt associated with the query |
|
222 |
@type type: one of confd.client.CONFD_UPCALL_TYPES |
|
223 |
@ivar type: upcall type (server reply, expired request, ...) |
|
224 |
@type orig_request: L{objects.ConfdRequest} |
|
225 |
@ivar orig_request: original request |
|
226 |
@type server_reply: L{objects.ConfdReply} |
|
227 |
@ivar server_reply: server reply |
|
228 |
@type server_ip: string |
|
229 |
@ivar server_ip: answering server ip address |
|
230 |
@type server_port: int |
|
231 |
@ivar server_port: answering server port |
|
232 |
@type extra_args: any |
|
233 |
@ivar extra_args: 'args' argument of the SendRequest function |
|
234 |
|
|
235 |
""" |
|
236 |
__slots__ = [ |
|
237 |
"salt", |
|
238 |
"type", |
|
239 |
"orig_request", |
|
240 |
"server_reply", |
|
241 |
"server_ip", |
|
242 |
"server_port", |
|
243 |
"extra_args", |
|
244 |
] |
|
191 | 245 |
|
192 | 246 |
|
193 | 247 |
class ConfdClientRequest(objects.ConfdRequest): |
Also available in: Unified diff