Revision b66ab629
b/lib/daemon.py | ||
---|---|---|
23 | 23 |
|
24 | 24 |
|
25 | 25 |
import asyncore |
26 |
import asynchat |
|
26 | 27 |
import os |
27 | 28 |
import signal |
28 | 29 |
import logging |
... | ... | |
160 | 161 |
raise NotImplementedError |
161 | 162 |
|
162 | 163 |
|
164 |
class AsyncTerminatedMessageStream(asynchat.async_chat): |
|
165 |
"""A terminator separated message stream asyncore module. |
|
166 |
|
|
167 |
Handles a stream connection receiving messages terminated by a defined |
|
168 |
separator. For each complete message handle_message is called. |
|
169 |
|
|
170 |
""" |
|
171 |
def __init__(self, connected_socket, peer_address, terminator, family): |
|
172 |
"""AsyncTerminatedMessageStream constructor. |
|
173 |
|
|
174 |
@type connected_socket: socket.socket |
|
175 |
@param connected_socket: connected stream socket to receive messages from |
|
176 |
@param peer_address: family-specific peer address |
|
177 |
@type terminator: string |
|
178 |
@param terminator: terminator separating messages in the stream |
|
179 |
@type family: integer |
|
180 |
@param family: socket family |
|
181 |
|
|
182 |
""" |
|
183 |
# python 2.4/2.5 uses conn=... while 2.6 has sock=... we have to cheat by |
|
184 |
# using a positional argument rather than a keyword one. |
|
185 |
asynchat.async_chat.__init__(self, connected_socket) |
|
186 |
self.connected_socket = connected_socket |
|
187 |
# on python 2.4 there is no "family" attribute for the socket class |
|
188 |
# FIXME: when we move to python 2.5 or above remove the family parameter |
|
189 |
#self.family = self.connected_socket.family |
|
190 |
self.family = family |
|
191 |
self.peer_address = peer_address |
|
192 |
self.terminator = terminator |
|
193 |
self.set_terminator(terminator) |
|
194 |
self.ibuffer = [] |
|
195 |
self.next_incoming_message = 0 |
|
196 |
|
|
197 |
# this method is overriding an asynchat.async_chat method |
|
198 |
def collect_incoming_data(self, data): |
|
199 |
self.ibuffer.append(data) |
|
200 |
|
|
201 |
# this method is overriding an asynchat.async_chat method |
|
202 |
def found_terminator(self): |
|
203 |
message = "".join(self.ibuffer) |
|
204 |
self.ibuffer = [] |
|
205 |
message_id = self.next_incoming_message |
|
206 |
self.next_incoming_message += 1 |
|
207 |
self.handle_message(message, message_id) |
|
208 |
|
|
209 |
def handle_message(self, message, message_id): |
|
210 |
"""Handle a terminated message. |
|
211 |
|
|
212 |
@type message: string |
|
213 |
@param message: message to handle |
|
214 |
@type message_id: integer |
|
215 |
@param message_id: stream's message sequence number |
|
216 |
|
|
217 |
""" |
|
218 |
pass |
|
219 |
# TODO: move this method to raise NotImplementedError |
|
220 |
# raise NotImplementedError |
|
221 |
|
|
222 |
def close_log(self): |
|
223 |
logging.info("Closing connection from %s", |
|
224 |
FormatAddress(self.family, self.peer_address)) |
|
225 |
self.close() |
|
226 |
|
|
227 |
# this method is overriding an asyncore.dispatcher method |
|
228 |
def handle_expt(self): |
|
229 |
self.close_log() |
|
230 |
|
|
231 |
# this method is overriding an asyncore.dispatcher method |
|
232 |
def handle_error(self): |
|
233 |
"""Log an error in handling any request, and proceed. |
|
234 |
|
|
235 |
""" |
|
236 |
logging.exception("Error while handling asyncore request") |
|
237 |
self.close_log() |
|
238 |
|
|
239 |
|
|
163 | 240 |
class AsyncUDPSocket(GanetiBaseAsyncoreDispatcher): |
164 | 241 |
"""An improved asyncore udp socket. |
165 | 242 |
|
Also available in: Unified diff