281 |
281 |
return serializer.LoadJson(data)
|
282 |
282 |
|
283 |
283 |
|
284 |
|
def WaitForSocketCondition(poller, sock, event, timeout):
|
|
284 |
def WaitForSocketCondition(sock, event, timeout):
|
285 |
285 |
"""Waits for a condition to occur on the socket.
|
286 |
286 |
|
287 |
|
@type poller: select.Poller
|
288 |
|
@param poller: Poller object as created by select.poll()
|
289 |
287 |
@type sock: socket
|
290 |
288 |
@param sock: Wait for events on this socket
|
291 |
289 |
@type event: int
|
... | ... | |
303 |
301 |
# Poller object expects milliseconds
|
304 |
302 |
timeout *= 1000
|
305 |
303 |
|
|
304 |
poller = select.poll()
|
306 |
305 |
poller.register(sock, event)
|
307 |
306 |
try:
|
308 |
307 |
while True:
|
... | ... | |
320 |
319 |
poller.unregister(sock)
|
321 |
320 |
|
322 |
321 |
|
323 |
|
def SocketOperation(poller, sock, op, arg1, timeout):
|
|
322 |
def SocketOperation(sock, op, arg1, timeout):
|
324 |
323 |
"""Wrapper around socket functions.
|
325 |
324 |
|
326 |
325 |
This function abstracts error handling for socket operations, especially
|
327 |
326 |
for the complicated interaction with OpenSSL.
|
328 |
327 |
|
329 |
|
@type poller: select.Poller
|
330 |
|
@param poller: Poller object as created by select.poll()
|
331 |
328 |
@type sock: socket
|
332 |
329 |
@param sock: Socket for the operation
|
333 |
330 |
@type op: int
|
... | ... | |
375 |
372 |
else:
|
376 |
373 |
wait_for_event = event_poll
|
377 |
374 |
|
378 |
|
event = WaitForSocketCondition(poller, sock, wait_for_event, timeout)
|
|
375 |
event = WaitForSocketCondition(sock, wait_for_event, timeout)
|
379 |
376 |
if event is None:
|
380 |
377 |
raise HttpSocketTimeout()
|
381 |
378 |
|
... | ... | |
464 |
461 |
raise
|
465 |
462 |
|
466 |
463 |
|
467 |
|
def ShutdownConnection(poller, sock, close_timeout, write_timeout, msgreader,
|
468 |
|
force):
|
|
464 |
def ShutdownConnection(sock, close_timeout, write_timeout, msgreader, force):
|
469 |
465 |
"""Closes the connection.
|
470 |
466 |
|
471 |
|
@type poller: select.Poller
|
472 |
|
@param poller: Poller object as created by select.poll()
|
473 |
467 |
@type sock: socket
|
474 |
468 |
@param sock: Socket to be shut down
|
475 |
469 |
@type close_timeout: float
|
... | ... | |
484 |
478 |
for peer
|
485 |
479 |
|
486 |
480 |
"""
|
487 |
|
poller = select.poll()
|
488 |
|
|
489 |
481 |
#print msgreader.peer_will_close, force
|
490 |
482 |
if msgreader and msgreader.peer_will_close and not force:
|
491 |
483 |
# Wait for peer to close
|
492 |
484 |
try:
|
493 |
485 |
# Check whether it's actually closed
|
494 |
|
if not SocketOperation(poller, sock, SOCKOP_RECV, 1, close_timeout):
|
|
486 |
if not SocketOperation(sock, SOCKOP_RECV, 1, close_timeout):
|
495 |
487 |
return
|
496 |
488 |
except (socket.error, HttpError, HttpSocketTimeout):
|
497 |
489 |
# Ignore errors at this stage
|
... | ... | |
500 |
492 |
# Close the connection from our side
|
501 |
493 |
try:
|
502 |
494 |
# We don't care about the return value, see NOTES in SSL_shutdown(3).
|
503 |
|
SocketOperation(poller, sock, SOCKOP_SHUTDOWN, socket.SHUT_RDWR,
|
|
495 |
SocketOperation(sock, SOCKOP_SHUTDOWN, socket.SHUT_RDWR,
|
504 |
496 |
write_timeout)
|
505 |
497 |
except HttpSocketTimeout:
|
506 |
498 |
raise HttpError("Timeout while shutting down connection")
|
... | ... | |
510 |
502 |
raise HttpError("Error while shutting down connection: %s" % err)
|
511 |
503 |
|
512 |
504 |
|
513 |
|
def Handshake(poller, sock, write_timeout):
|
|
505 |
def Handshake(sock, write_timeout):
|
514 |
506 |
"""Shakes peer's hands.
|
515 |
507 |
|
516 |
|
@type poller: select.Poller
|
517 |
|
@param poller: Poller object as created by select.poll()
|
518 |
508 |
@type sock: socket
|
519 |
509 |
@param sock: Socket to be shut down
|
520 |
510 |
@type write_timeout: float
|
... | ... | |
522 |
512 |
|
523 |
513 |
"""
|
524 |
514 |
try:
|
525 |
|
return SocketOperation(poller, sock, SOCKOP_HANDSHAKE, None, write_timeout)
|
|
515 |
return SocketOperation(sock, SOCKOP_HANDSHAKE, None, write_timeout)
|
526 |
516 |
except HttpSocketTimeout:
|
527 |
517 |
raise HttpError("Timeout during SSL handshake")
|
528 |
518 |
except socket.error, err:
|
... | ... | |
672 |
662 |
|
673 |
663 |
buf = self._FormatMessage()
|
674 |
664 |
|
675 |
|
poller = select.poll()
|
676 |
|
|
677 |
665 |
pos = 0
|
678 |
666 |
end = len(buf)
|
679 |
667 |
while pos < end:
|
680 |
668 |
# Send only SOCK_BUF_SIZE bytes at a time
|
681 |
669 |
data = buf[pos:(pos + SOCK_BUF_SIZE)]
|
682 |
670 |
|
683 |
|
sent = SocketOperation(poller, sock, SOCKOP_SEND, data,
|
684 |
|
write_timeout)
|
|
671 |
sent = SocketOperation(sock, SOCKOP_SEND, data, write_timeout)
|
685 |
672 |
|
686 |
673 |
# Remove sent bytes
|
687 |
674 |
pos += sent
|
... | ... | |
761 |
748 |
self.sock = sock
|
762 |
749 |
self.msg = msg
|
763 |
750 |
|
764 |
|
self.poller = select.poll()
|
765 |
751 |
self.start_line_buffer = None
|
766 |
752 |
self.header_buffer = StringIO()
|
767 |
753 |
self.body_buffer = StringIO()
|
... | ... | |
774 |
760 |
while self.parser_status != self.PS_COMPLETE:
|
775 |
761 |
# TODO: Don't read more than necessary (Content-Length), otherwise
|
776 |
762 |
# data might be lost and/or an error could occur
|
777 |
|
data = SocketOperation(self.poller, sock, SOCKOP_RECV, SOCK_BUF_SIZE,
|
778 |
|
read_timeout)
|
|
763 |
data = SocketOperation(sock, SOCKOP_RECV, SOCK_BUF_SIZE, read_timeout)
|
779 |
764 |
|
780 |
765 |
if data:
|
781 |
766 |
buf += data
|