Revision 5f3269fc

b/daemons/ganeti-confd
45 45
from ganeti.confd.server import ConfdProcessor
46 46

  
47 47

  
48
class ConfdAsyncUDPServer(asyncore.dispatcher):
48
class ConfdAsyncUDPServer(daemon.AsyncUDPSocket):
49 49
  """The confd udp server, suitable for use with asyncore.
50 50

  
51 51
  """
......
60 60
    @param reader: ConfigReader to use to access the config
61 61

  
62 62
    """
63
    asyncore.dispatcher.__init__(self)
63
    daemon.AsyncUDPSocket.__init__(self)
64 64
    self.bind_address = bind_address
65 65
    self.port = port
66 66
    self.processor = processor
67
    self.out_queue = []
68
    self.create_socket(socket.AF_INET, socket.SOCK_DGRAM)
69 67
    self.bind((bind_address, port))
70 68
    logging.debug("listening on ('%s':%d)" % (bind_address, port))
71 69

  
72
  # this method is overriding an asyncore.dispatcher method
73
  def handle_connect(self):
74
    # Python thinks that the first udp message from a source qualifies as a
75
    # "connect" and further ones are part of the same connection. We beg to
76
    # differ and treat all messages equally.
77
    pass
78

  
79
  # this method is overriding an asyncore.dispatcher method
80
  def handle_read(self):
81
    try:
82
      try:
83
        payload_in, address = self.recvfrom(4096)
84
      except socket.error, err:
85
        if err.errno == errno.EINTR:
86
          # we got a signal while trying to read. no need to do anything,
87
          # handle_read will be called again if there is data on the socket.
88
          return
89
        else:
90
          raise
91
      ip, port = address
92
      payload_out =  self.processor.ExecQuery(payload_in, ip, port)
93
      if payload_out is not None:
94
        self.out_queue.append((ip, port, payload_out))
95
    except:
96
      # we need to catch any exception here, log it, but proceed, because even
97
      # if we failed handling a single request, we still want the confd to
98
      # continue working.
99
      logging.error("Unexpected exception", exc_info=True)
100

  
101
  # this method is overriding an asyncore.dispatcher method
102
  def writable(self):
103
    # Only handle writes if we have something enqueued to write
104
    if self.out_queue:
105
      return True
106
    else:
107
      return False
108

  
109
  def handle_write(self):
110
    try:
111
      if not self.out_queue:
112
        logging.error("handle_write called with empty output queue")
113
        return
114
      (ip, port, payload) = self.out_queue[0]
115
      try:
116
        self.sendto(payload, 0, (ip, port))
117
      except socket.error, err:
118
        if err.errno == errno.EINTR:
119
          # we got a signal while trying to write. no need to do anything,
120
          # handle_write will be called again because we haven't emptied the
121
          # out_queue, and we'll try again
122
          return
123
        else:
124
          raise
125
      self.out_queue.pop(0)
126
    except:
127
      # we need to catch any exception here, log it, but proceed, because even
128
      # if we failed handling a single request, we still want the confd to
129
      # continue working.
130
      logging.error("Unexpected exception", exc_info=True)
70
  # this method is overriding a daemon.AsyncUDPSocket method
71
  def handle_datagram(self, payload_in, ip, port):
72
    payload_out =  self.processor.ExecQuery(payload_in, ip, port)
73
    if payload_out is not None:
74
      self.enqueue_send(ip, port, payload_out)
131 75

  
132 76

  
133 77
class ConfdInotifyEventHandler(pyinotify.ProcessEvent):
b/lib/daemon.py
30 30
import logging
31 31
import sched
32 32
import time
33
import socket
33 34

  
34 35
from ganeti import utils
35 36
from ganeti import constants
......
71 72
    sched.scheduler.__init__(self, timefunc, AsyncoreDelayFunction)
72 73

  
73 74

  
75
class AsyncUDPSocket(asyncore.dispatcher):
76
  """An improved asyncore udp socket.
77

  
78
  """
79
  def __init__(self):
80
    """Constructor for AsyncUDPSocket
81

  
82
    """
83
    asyncore.dispatcher.__init__(self)
84
    self._out_queue = []
85
    self.create_socket(socket.AF_INET, socket.SOCK_DGRAM)
86

  
87
  # this method is overriding an asyncore.dispatcher method
88
  def handle_connect(self):
89
    # Python thinks that the first udp message from a source qualifies as a
90
    # "connect" and further ones are part of the same connection. We beg to
91
    # differ and treat all messages equally.
92
    pass
93

  
94
  # this method is overriding an asyncore.dispatcher method
95
  def handle_read(self):
96
    try:
97
      try:
98
        payload, address = self.recvfrom(4096)
99
      except socket.error, err:
100
        if err.errno == errno.EINTR:
101
          # we got a signal while trying to read. no need to do anything,
102
          # handle_read will be called again if there is data on the socket.
103
          return
104
        else:
105
          raise
106
      ip, port = address
107
      self.handle_datagram(payload, ip, port)
108
    except:
109
      # we need to catch any exception here, log it, but proceed, because even
110
      # if we failed handling a single request, we still want to continue.
111
      logging.error("Unexpected exception", exc_info=True)
112

  
113
  def handle_datagram(self, payload, ip, port):
114
    """Handle an already read udp datagram
115

  
116
    """
117
    raise NotImplementedError
118

  
119
  # this method is overriding an asyncore.dispatcher method
120
  def writable(self):
121
    # We should check whether we can write to the socket only if we have
122
    # something scheduled to be written
123
    return bool(self._out_queue)
124

  
125
  def handle_write(self):
126
    try:
127
      if not self._out_queue:
128
        logging.error("handle_write called with empty output queue")
129
        return
130
      (ip, port, payload) = self._out_queue[0]
131
      try:
132
        self.sendto(payload, 0, (ip, port))
133
      except socket.error, err:
134
        if err.errno == errno.EINTR:
135
          # we got a signal while trying to write. no need to do anything,
136
          # handle_write will be called again because we haven't emptied the
137
          # _out_queue, and we'll try again
138
          return
139
        else:
140
          raise
141
      self._out_queue.pop(0)
142
    except:
143
      # we need to catch any exception here, log it, but proceed, because even
144
      # if we failed sending a single datagram we still want to continue.
145
      logging.error("Unexpected exception", exc_info=True)
146

  
147
  def enqueue_send(self, ip, port, payload):
148
    """Enqueue a datagram to be sent when possible
149

  
150
    """
151
    self._out_queue.append((ip, port, payload))
152

  
153

  
74 154
class Mainloop(object):
75 155
  """Generic mainloop for daemons
76 156

  

Also available in: Unified diff