Revision 5f3269fc daemons/ganeti-confd

b/daemons/ganeti-confd
45 45
from ganeti.confd.server import ConfdProcessor
46 46

  
47 47

  
48
class ConfdAsyncUDPServer(asyncore.dispatcher):
48
class ConfdAsyncUDPServer(daemon.AsyncUDPSocket):
49 49
  """The confd udp server, suitable for use with asyncore.
50 50

  
51 51
  """
......
60 60
    @param reader: ConfigReader to use to access the config
61 61

  
62 62
    """
63
    asyncore.dispatcher.__init__(self)
63
    daemon.AsyncUDPSocket.__init__(self)
64 64
    self.bind_address = bind_address
65 65
    self.port = port
66 66
    self.processor = processor
67
    self.out_queue = []
68
    self.create_socket(socket.AF_INET, socket.SOCK_DGRAM)
69 67
    self.bind((bind_address, port))
70 68
    logging.debug("listening on ('%s':%d)" % (bind_address, port))
71 69

  
72
  # this method is overriding an asyncore.dispatcher method
73
  def handle_connect(self):
74
    # Python thinks that the first udp message from a source qualifies as a
75
    # "connect" and further ones are part of the same connection. We beg to
76
    # differ and treat all messages equally.
77
    pass
78

  
79
  # this method is overriding an asyncore.dispatcher method
80
  def handle_read(self):
81
    try:
82
      try:
83
        payload_in, address = self.recvfrom(4096)
84
      except socket.error, err:
85
        if err.errno == errno.EINTR:
86
          # we got a signal while trying to read. no need to do anything,
87
          # handle_read will be called again if there is data on the socket.
88
          return
89
        else:
90
          raise
91
      ip, port = address
92
      payload_out =  self.processor.ExecQuery(payload_in, ip, port)
93
      if payload_out is not None:
94
        self.out_queue.append((ip, port, payload_out))
95
    except:
96
      # we need to catch any exception here, log it, but proceed, because even
97
      # if we failed handling a single request, we still want the confd to
98
      # continue working.
99
      logging.error("Unexpected exception", exc_info=True)
100

  
101
  # this method is overriding an asyncore.dispatcher method
102
  def writable(self):
103
    # Only handle writes if we have something enqueued to write
104
    if self.out_queue:
105
      return True
106
    else:
107
      return False
108

  
109
  def handle_write(self):
110
    try:
111
      if not self.out_queue:
112
        logging.error("handle_write called with empty output queue")
113
        return
114
      (ip, port, payload) = self.out_queue[0]
115
      try:
116
        self.sendto(payload, 0, (ip, port))
117
      except socket.error, err:
118
        if err.errno == errno.EINTR:
119
          # we got a signal while trying to write. no need to do anything,
120
          # handle_write will be called again because we haven't emptied the
121
          # out_queue, and we'll try again
122
          return
123
        else:
124
          raise
125
      self.out_queue.pop(0)
126
    except:
127
      # we need to catch any exception here, log it, but proceed, because even
128
      # if we failed handling a single request, we still want the confd to
129
      # continue working.
130
      logging.error("Unexpected exception", exc_info=True)
70
  # this method is overriding a daemon.AsyncUDPSocket method
71
  def handle_datagram(self, payload_in, ip, port):
72
    payload_out =  self.processor.ExecQuery(payload_in, ip, port)
73
    if payload_out is not None:
74
      self.enqueue_send(ip, port, payload_out)
131 75

  
132 76

  
133 77
class ConfdInotifyEventHandler(pyinotify.ProcessEvent):

Also available in: Unified diff