From 5f3269fc27e1f607073e64a63dfffb0062514da2 Mon Sep 17 00:00:00 2001 From: Guido Trotter Date: Mon, 14 Sep 2009 14:13:17 +0100 Subject: [PATCH] Abstract AsyncUDPSocket to daemon This allows this extended asyncore+udp module to be used also in other daemons, and in the confd client library Signed-off-by: Guido Trotter Reviewed-by: Michael Hanselmann --- daemons/ganeti-confd | 70 +++++-------------------------------------- lib/daemon.py | 80 ++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 87 insertions(+), 63 deletions(-) diff --git a/daemons/ganeti-confd b/daemons/ganeti-confd index f0be30b..378ddb0 100755 --- a/daemons/ganeti-confd +++ b/daemons/ganeti-confd @@ -45,7 +45,7 @@ from ganeti.asyncnotifier import AsyncNotifier from ganeti.confd.server import ConfdProcessor -class ConfdAsyncUDPServer(asyncore.dispatcher): +class ConfdAsyncUDPServer(daemon.AsyncUDPSocket): """The confd udp server, suitable for use with asyncore. """ @@ -60,74 +60,18 @@ class ConfdAsyncUDPServer(asyncore.dispatcher): @param reader: ConfigReader to use to access the config """ - asyncore.dispatcher.__init__(self) + daemon.AsyncUDPSocket.__init__(self) self.bind_address = bind_address self.port = port self.processor = processor - self.out_queue = [] - self.create_socket(socket.AF_INET, socket.SOCK_DGRAM) self.bind((bind_address, port)) logging.debug("listening on ('%s':%d)" % (bind_address, port)) - # this method is overriding an asyncore.dispatcher method - def handle_connect(self): - # Python thinks that the first udp message from a source qualifies as a - # "connect" and further ones are part of the same connection. We beg to - # differ and treat all messages equally. - pass - - # this method is overriding an asyncore.dispatcher method - def handle_read(self): - try: - try: - payload_in, address = self.recvfrom(4096) - except socket.error, err: - if err.errno == errno.EINTR: - # we got a signal while trying to read. no need to do anything, - # handle_read will be called again if there is data on the socket. - return - else: - raise - ip, port = address - payload_out = self.processor.ExecQuery(payload_in, ip, port) - if payload_out is not None: - self.out_queue.append((ip, port, payload_out)) - except: - # we need to catch any exception here, log it, but proceed, because even - # if we failed handling a single request, we still want the confd to - # continue working. - logging.error("Unexpected exception", exc_info=True) - - # this method is overriding an asyncore.dispatcher method - def writable(self): - # Only handle writes if we have something enqueued to write - if self.out_queue: - return True - else: - return False - - def handle_write(self): - try: - if not self.out_queue: - logging.error("handle_write called with empty output queue") - return - (ip, port, payload) = self.out_queue[0] - try: - self.sendto(payload, 0, (ip, port)) - except socket.error, err: - if err.errno == errno.EINTR: - # we got a signal while trying to write. no need to do anything, - # handle_write will be called again because we haven't emptied the - # out_queue, and we'll try again - return - else: - raise - self.out_queue.pop(0) - except: - # we need to catch any exception here, log it, but proceed, because even - # if we failed handling a single request, we still want the confd to - # continue working. - logging.error("Unexpected exception", exc_info=True) + # this method is overriding a daemon.AsyncUDPSocket method + def handle_datagram(self, payload_in, ip, port): + payload_out = self.processor.ExecQuery(payload_in, ip, port) + if payload_out is not None: + self.enqueue_send(ip, port, payload_out) class ConfdInotifyEventHandler(pyinotify.ProcessEvent): diff --git a/lib/daemon.py b/lib/daemon.py index 8e668fc..9467bb9 100644 --- a/lib/daemon.py +++ b/lib/daemon.py @@ -30,6 +30,7 @@ import errno import logging import sched import time +import socket from ganeti import utils from ganeti import constants @@ -71,6 +72,85 @@ class AsyncoreScheduler(sched.scheduler): sched.scheduler.__init__(self, timefunc, AsyncoreDelayFunction) +class AsyncUDPSocket(asyncore.dispatcher): + """An improved asyncore udp socket. + + """ + def __init__(self): + """Constructor for AsyncUDPSocket + + """ + asyncore.dispatcher.__init__(self) + self._out_queue = [] + self.create_socket(socket.AF_INET, socket.SOCK_DGRAM) + + # this method is overriding an asyncore.dispatcher method + def handle_connect(self): + # Python thinks that the first udp message from a source qualifies as a + # "connect" and further ones are part of the same connection. We beg to + # differ and treat all messages equally. + pass + + # this method is overriding an asyncore.dispatcher method + def handle_read(self): + try: + try: + payload, address = self.recvfrom(4096) + except socket.error, err: + if err.errno == errno.EINTR: + # we got a signal while trying to read. no need to do anything, + # handle_read will be called again if there is data on the socket. + return + else: + raise + ip, port = address + self.handle_datagram(payload, ip, port) + except: + # we need to catch any exception here, log it, but proceed, because even + # if we failed handling a single request, we still want to continue. + logging.error("Unexpected exception", exc_info=True) + + def handle_datagram(self, payload, ip, port): + """Handle an already read udp datagram + + """ + raise NotImplementedError + + # this method is overriding an asyncore.dispatcher method + def writable(self): + # We should check whether we can write to the socket only if we have + # something scheduled to be written + return bool(self._out_queue) + + def handle_write(self): + try: + if not self._out_queue: + logging.error("handle_write called with empty output queue") + return + (ip, port, payload) = self._out_queue[0] + try: + self.sendto(payload, 0, (ip, port)) + except socket.error, err: + if err.errno == errno.EINTR: + # we got a signal while trying to write. no need to do anything, + # handle_write will be called again because we haven't emptied the + # _out_queue, and we'll try again + return + else: + raise + self._out_queue.pop(0) + except: + # we need to catch any exception here, log it, but proceed, because even + # if we failed sending a single datagram we still want to continue. + logging.error("Unexpected exception", exc_info=True) + + def enqueue_send(self, ip, port, payload): + """Enqueue a datagram to be sent when possible + + """ + self._out_queue.append((ip, port, payload)) + + class Mainloop(object): """Generic mainloop for daemons -- 1.7.10.4