Revision 37e62cb9
b/lib/daemon.py | ||
---|---|---|
175 | 175 |
separator. For each complete message handle_message is called. |
176 | 176 |
|
177 | 177 |
""" |
178 |
def __init__(self, connected_socket, peer_address, terminator, family): |
|
178 |
def __init__(self, connected_socket, peer_address, terminator, family, |
|
179 |
unhandled_limit): |
|
179 | 180 |
"""AsyncTerminatedMessageStream constructor. |
180 | 181 |
|
181 | 182 |
@type connected_socket: socket.socket |
... | ... | |
185 | 186 |
@param terminator: terminator separating messages in the stream |
186 | 187 |
@type family: integer |
187 | 188 |
@param family: socket family |
189 |
@type unhandled_limit: integer or None |
|
190 |
@param unhandled_limit: maximum unanswered messages |
|
188 | 191 |
|
189 | 192 |
""" |
190 | 193 |
# python 2.4/2.5 uses conn=... while 2.6 has sock=... we have to cheat by |
... | ... | |
197 | 200 |
self.family = family |
198 | 201 |
self.peer_address = peer_address |
199 | 202 |
self.terminator = terminator |
203 |
self.unhandled_limit = unhandled_limit |
|
200 | 204 |
self.set_terminator(terminator) |
201 | 205 |
self.ibuffer = [] |
202 |
self.next_incoming_message = 0 |
|
206 |
self.receive_count = 0 |
|
207 |
self.send_count = 0 |
|
203 | 208 |
self.oqueue = collections.deque() |
209 |
self.iqueue = collections.deque() |
|
204 | 210 |
|
205 | 211 |
# this method is overriding an asynchat.async_chat method |
206 | 212 |
def collect_incoming_data(self, data): |
207 | 213 |
self.ibuffer.append(data) |
208 | 214 |
|
215 |
def _can_handle_message(self): |
|
216 |
return (self.unhandled_limit is None or |
|
217 |
(self.receive_count < self.send_count + self.unhandled_limit) and |
|
218 |
not self.iqueue) |
|
219 |
|
|
209 | 220 |
# this method is overriding an asynchat.async_chat method |
210 | 221 |
def found_terminator(self): |
211 | 222 |
message = "".join(self.ibuffer) |
212 | 223 |
self.ibuffer = [] |
213 |
message_id = self.next_incoming_message |
|
214 |
self.next_incoming_message += 1 |
|
215 |
self.handle_message(message, message_id) |
|
224 |
message_id = self.receive_count |
|
225 |
# We need to increase the receive_count after checking if the message can |
|
226 |
# be handled, but before calling handle_message |
|
227 |
can_handle = self._can_handle_message() |
|
228 |
self.receive_count += 1 |
|
229 |
if can_handle: |
|
230 |
self.handle_message(message, message_id) |
|
231 |
else: |
|
232 |
self.iqueue.append((message, message_id)) |
|
216 | 233 |
|
217 | 234 |
def handle_message(self, message, message_id): |
218 | 235 |
"""Handle a terminated message. |
... | ... | |
240 | 257 |
""" |
241 | 258 |
# If we just append the message we received to the output queue, this |
242 | 259 |
# function can be safely called by multiple threads at the same time, and |
243 |
# we don't need locking, since deques are thread safe. |
|
260 |
# we don't need locking, since deques are thread safe. handle_write in the |
|
261 |
# asyncore thread will handle the next input message if there are any |
|
262 |
# enqueued. |
|
244 | 263 |
self.oqueue.append(message) |
245 | 264 |
|
246 | 265 |
# this method is overriding an asyncore.dispatcher method |
266 |
def readable(self): |
|
267 |
# read from the socket if we can handle the next requests |
|
268 |
return self._can_handle_message() and asynchat.async_chat.readable(self) |
|
269 |
|
|
270 |
# this method is overriding an asyncore.dispatcher method |
|
247 | 271 |
def writable(self): |
248 | 272 |
# the output queue may become full just after we called writable. This only |
249 | 273 |
# works if we know we'll have something else waking us up from the select, |
... | ... | |
253 | 277 |
# this method is overriding an asyncore.dispatcher method |
254 | 278 |
def handle_write(self): |
255 | 279 |
if self.oqueue: |
280 |
# if we have data in the output queue, then send_message was called. |
|
281 |
# this means we can process one more message from the input queue, if |
|
282 |
# there are any. |
|
256 | 283 |
data = self.oqueue.popleft() |
257 | 284 |
self.push(data + self.terminator) |
285 |
self.send_count += 1 |
|
286 |
if self.iqueue: |
|
287 |
self.handle_message(*self.iqueue.popleft()) |
|
258 | 288 |
self.initiate_send() |
259 | 289 |
|
260 | 290 |
def close_log(self): |
b/test/ganeti.daemon_unittest.py | ||
---|---|---|
273 | 273 |
class _MyMessageStreamHandler(daemon.AsyncTerminatedMessageStream): |
274 | 274 |
|
275 | 275 |
def __init__(self, connected_socket, client_address, terminator, family, |
276 |
message_fn, client_id): |
|
276 |
message_fn, client_id, unhandled_limit):
|
|
277 | 277 |
daemon.AsyncTerminatedMessageStream.__init__(self, connected_socket, |
278 | 278 |
client_address, |
279 |
terminator, family) |
|
279 |
terminator, family, |
|
280 |
unhandled_limit) |
|
280 | 281 |
self.message_fn = message_fn |
281 | 282 |
self.client_id = client_id |
282 | 283 |
self.error_count = 0 |
... | ... | |
301 | 302 |
self.server = _MyAsyncStreamServer(self.family, self.address, |
302 | 303 |
self.handle_connection) |
303 | 304 |
self.client_handler = _MyMessageStreamHandler |
305 |
self.unhandled_limit = None |
|
304 | 306 |
self.terminator = "\3" |
305 | 307 |
self.address = self.server.getsockname() |
306 | 308 |
self.clients = [] |
... | ... | |
339 | 341 |
client_handler = self.client_handler(connected_socket, client_address, |
340 | 342 |
self.terminator, self.family, |
341 | 343 |
self.handle_message, |
342 |
client_id) |
|
344 |
client_id, self.unhandled_limit)
|
|
343 | 345 |
self.connections.append(client_handler) |
344 | 346 |
self.countTerminate("connect_terminate_count") |
345 | 347 |
|
... | ... | |
494 | 496 |
self.assertEquals(client1.recv(4096), "r0\3r1\3r2\3") |
495 | 497 |
self.assertRaises(socket.error, client2.recv, 4096) |
496 | 498 |
|
499 |
def testLimitedUnhandledMessages(self): |
|
500 |
self.connect_terminate_count = None |
|
501 |
self.message_terminate_count = 3 |
|
502 |
self.unhandled_limit = 2 |
|
503 |
client1 = self.getClient() |
|
504 |
client2 = self.getClient() |
|
505 |
client1.send("one\3composed\3long\3message\3") |
|
506 |
client2.send("c2one\3") |
|
507 |
self.mainloop.Run() |
|
508 |
self.assertEquals(self.messages[0], ["one", "composed"]) |
|
509 |
self.assertEquals(self.messages[1], ["c2one"]) |
|
510 |
self.assertFalse(self.connections[0].readable()) |
|
511 |
self.assert_(self.connections[1].readable()) |
|
512 |
self.connections[0].send_message("r0") |
|
513 |
self.message_terminate_count = None |
|
514 |
client1.send("another\3") |
|
515 |
# when we write replies messages queued also get handled, but not the ones |
|
516 |
# in the socket. |
|
517 |
while self.connections[0].writable(): |
|
518 |
self.connections[0].handle_write() |
|
519 |
self.assertFalse(self.connections[0].readable()) |
|
520 |
self.assertEquals(self.messages[0], ["one", "composed", "long"]) |
|
521 |
self.connections[0].send_message("r1") |
|
522 |
self.connections[0].send_message("r2") |
|
523 |
while self.connections[0].writable(): |
|
524 |
self.connections[0].handle_write() |
|
525 |
self.assertEquals(self.messages[0], ["one", "composed", "long", "message"]) |
|
526 |
self.assert_(self.connections[0].readable()) |
|
527 |
|
|
528 |
def testLimitedUnhandledMessagesOne(self): |
|
529 |
self.connect_terminate_count = None |
|
530 |
self.message_terminate_count = 2 |
|
531 |
self.unhandled_limit = 1 |
|
532 |
client1 = self.getClient() |
|
533 |
client2 = self.getClient() |
|
534 |
client1.send("one\3composed\3message\3") |
|
535 |
client2.send("c2one\3") |
|
536 |
self.mainloop.Run() |
|
537 |
self.assertEquals(self.messages[0], ["one"]) |
|
538 |
self.assertEquals(self.messages[1], ["c2one"]) |
|
539 |
self.assertFalse(self.connections[0].readable()) |
|
540 |
self.assertFalse(self.connections[1].readable()) |
|
541 |
self.connections[0].send_message("r0") |
|
542 |
self.message_terminate_count = None |
|
543 |
while self.connections[0].writable(): |
|
544 |
self.connections[0].handle_write() |
|
545 |
self.assertFalse(self.connections[0].readable()) |
|
546 |
self.assertEquals(self.messages[0], ["one", "composed"]) |
|
547 |
self.connections[0].send_message("r2") |
|
548 |
self.connections[0].send_message("r3") |
|
549 |
while self.connections[0].writable(): |
|
550 |
self.connections[0].handle_write() |
|
551 |
self.assertEquals(self.messages[0], ["one", "composed", "message"]) |
|
552 |
self.assert_(self.connections[0].readable()) |
|
553 |
|
|
497 | 554 |
|
498 | 555 |
class TestAsyncStreamServerUnixPath(TestAsyncStreamServerTCP): |
499 | 556 |
"""Test daemon.AsyncStreamServer with a Unix path connection""" |
Also available in: Unified diff