Revision b66ab629

b/lib/daemon.py
23 23

  
24 24

  
25 25
import asyncore
26
import asynchat
26 27
import os
27 28
import signal
28 29
import logging
......
160 161
    raise NotImplementedError
161 162

  
162 163

  
164
class AsyncTerminatedMessageStream(asynchat.async_chat):
165
  """A terminator separated message stream asyncore module.
166

  
167
  Handles a stream connection receiving messages terminated by a defined
168
  separator. For each complete message handle_message is called.
169

  
170
  """
171
  def __init__(self, connected_socket, peer_address, terminator, family):
172
    """AsyncTerminatedMessageStream constructor.
173

  
174
    @type connected_socket: socket.socket
175
    @param connected_socket: connected stream socket to receive messages from
176
    @param peer_address: family-specific peer address
177
    @type terminator: string
178
    @param terminator: terminator separating messages in the stream
179
    @type family: integer
180
    @param family: socket family
181

  
182
    """
183
    # python 2.4/2.5 uses conn=... while 2.6 has sock=... we have to cheat by
184
    # using a positional argument rather than a keyword one.
185
    asynchat.async_chat.__init__(self, connected_socket)
186
    self.connected_socket = connected_socket
187
    # on python 2.4 there is no "family" attribute for the socket class
188
    # FIXME: when we move to python 2.5 or above remove the family parameter
189
    #self.family = self.connected_socket.family
190
    self.family = family
191
    self.peer_address = peer_address
192
    self.terminator = terminator
193
    self.set_terminator(terminator)
194
    self.ibuffer = []
195
    self.next_incoming_message = 0
196

  
197
  # this method is overriding an asynchat.async_chat method
198
  def collect_incoming_data(self, data):
199
    self.ibuffer.append(data)
200

  
201
  # this method is overriding an asynchat.async_chat method
202
  def found_terminator(self):
203
    message = "".join(self.ibuffer)
204
    self.ibuffer = []
205
    message_id = self.next_incoming_message
206
    self.next_incoming_message += 1
207
    self.handle_message(message, message_id)
208

  
209
  def handle_message(self, message, message_id):
210
    """Handle a terminated message.
211

  
212
    @type message: string
213
    @param message: message to handle
214
    @type message_id: integer
215
    @param message_id: stream's message sequence number
216

  
217
    """
218
    pass
219
    # TODO: move this method to raise NotImplementedError
220
    # raise NotImplementedError
221

  
222
  def close_log(self):
223
    logging.info("Closing connection from %s",
224
                 FormatAddress(self.family, self.peer_address))
225
    self.close()
226

  
227
  # this method is overriding an asyncore.dispatcher method
228
  def handle_expt(self):
229
    self.close_log()
230

  
231
  # this method is overriding an asyncore.dispatcher method
232
  def handle_error(self):
233
    """Log an error in handling any request, and proceed.
234

  
235
    """
236
    logging.exception("Error while handling asyncore request")
237
    self.close_log()
238

  
239

  
163 240
class AsyncUDPSocket(GanetiBaseAsyncoreDispatcher):
164 241
  """An improved asyncore udp socket.
165 242

  

Also available in: Unified diff