Revision 71e114da lib/confd/client.py
b/lib/confd/client.py | ||
---|---|---|
85 | 85 |
self.client.HandleResponse(payload, ip, port) |
86 | 86 |
|
87 | 87 |
|
88 |
class _Request(object): |
|
89 |
"""Request status structure. |
|
90 |
|
|
91 |
@ivar request: the request data |
|
92 |
@ivar args: any extra arguments for the callback |
|
93 |
@ivar expiry: the expiry timestamp of the request |
|
94 |
|
|
95 |
""" |
|
96 |
def __init__(self, request, args, expiry): |
|
97 |
self.request = request |
|
98 |
self.args = args |
|
99 |
self.expiry = expiry |
|
100 |
|
|
101 |
|
|
88 | 102 |
class ConfdClient: |
89 | 103 |
"""Send queries to confd, and get back answers. |
90 | 104 |
|
... | ... | |
92 | 106 |
getting back answers, this is an asynchronous library. It can either work |
93 | 107 |
through asyncore or with your own handling. |
94 | 108 |
|
109 |
@type _requests: dict |
|
110 |
@ivar _requests: dictionary indexes by salt, which contains data |
|
111 |
about the outstanding requests; the values are objects of type |
|
112 |
L{_Request} |
|
113 |
|
|
95 | 114 |
""" |
96 | 115 |
def __init__(self, hmac_key, peers, callback, port=None, logger=None): |
97 | 116 |
"""Constructor for ConfdClient |
... | ... | |
118 | 137 |
self._confd_port = port |
119 | 138 |
self._logger = logger |
120 | 139 |
self._requests = {} |
121 |
self._expire_requests = [] |
|
122 | 140 |
|
123 | 141 |
if self._confd_port is None: |
124 | 142 |
self._confd_port = utils.GetDaemonPort(constants.CONFD) |
... | ... | |
161 | 179 |
|
162 | 180 |
""" |
163 | 181 |
now = time.time() |
164 |
while self._expire_requests: |
|
165 |
expire_time, rsalt = self._expire_requests[0] |
|
166 |
if now >= expire_time: |
|
167 |
self._expire_requests.pop(0) |
|
168 |
(request, args) = self._requests[rsalt] |
|
182 |
for rsalt, rq in self._requests.items(): |
|
183 |
if now >= rq.expiry: |
|
169 | 184 |
del self._requests[rsalt] |
170 | 185 |
client_reply = ConfdUpcallPayload(salt=rsalt, |
171 | 186 |
type=UPCALL_EXPIRE, |
172 |
orig_request=request, |
|
173 |
extra_args=args, |
|
187 |
orig_request=rq.request,
|
|
188 |
extra_args=rq.args,
|
|
174 | 189 |
client=self, |
175 | 190 |
) |
176 | 191 |
self._callback(client_reply) |
177 |
else: |
|
178 |
break |
|
179 | 192 |
|
180 | 193 |
def SendRequest(self, request, args=None, coverage=None, async=True): |
181 | 194 |
"""Send a confd request to some MCs |
... | ... | |
219 | 232 |
except errors.UdpDataSizeError: |
220 | 233 |
raise errors.ConfdClientError("Request too big") |
221 | 234 |
|
222 |
self._requests[request.rsalt] = (request, args) |
|
223 | 235 |
expire_time = now + constants.CONFD_CLIENT_EXPIRE_TIMEOUT |
224 |
self._expire_requests.append((expire_time, request.rsalt))
|
|
236 |
self._requests[request.rsalt] = _Request(request, args, expire_time)
|
|
225 | 237 |
|
226 | 238 |
if not async: |
227 | 239 |
self.FlushSendQueue() |
... | ... | |
241 | 253 |
return |
242 | 254 |
|
243 | 255 |
try: |
244 |
(request, args) = self._requests[salt]
|
|
256 |
rq = self._requests[salt]
|
|
245 | 257 |
except KeyError: |
246 | 258 |
if self._logger: |
247 | 259 |
self._logger.debug("Discarding unknown (expired?) reply: %s" % err) |
... | ... | |
250 | 262 |
client_reply = ConfdUpcallPayload(salt=salt, |
251 | 263 |
type=UPCALL_REPLY, |
252 | 264 |
server_reply=answer, |
253 |
orig_request=request, |
|
265 |
orig_request=rq.request,
|
|
254 | 266 |
server_ip=ip, |
255 | 267 |
server_port=port, |
256 |
extra_args=args, |
|
268 |
extra_args=rq.args,
|
|
257 | 269 |
client=self, |
258 | 270 |
) |
259 | 271 |
self._callback(client_reply) |
... | ... | |
510 | 522 |
self._HandleExpire(up) |
511 | 523 |
self._callback(up) |
512 | 524 |
|
525 |
|
|
513 | 526 |
def GetConfdClient(callback): |
514 | 527 |
"""Return a client configured using the given callback. |
515 | 528 |
|
Also available in: Unified diff