Revision 5f3269fc lib/daemon.py

b/lib/daemon.py
30 30
import logging
31 31
import sched
32 32
import time
33
import socket
33 34

  
34 35
from ganeti import utils
35 36
from ganeti import constants
......
71 72
    sched.scheduler.__init__(self, timefunc, AsyncoreDelayFunction)
72 73

  
73 74

  
75
class AsyncUDPSocket(asyncore.dispatcher):
76
  """An improved asyncore udp socket.
77

  
78
  """
79
  def __init__(self):
80
    """Constructor for AsyncUDPSocket
81

  
82
    """
83
    asyncore.dispatcher.__init__(self)
84
    self._out_queue = []
85
    self.create_socket(socket.AF_INET, socket.SOCK_DGRAM)
86

  
87
  # this method is overriding an asyncore.dispatcher method
88
  def handle_connect(self):
89
    # Python thinks that the first udp message from a source qualifies as a
90
    # "connect" and further ones are part of the same connection. We beg to
91
    # differ and treat all messages equally.
92
    pass
93

  
94
  # this method is overriding an asyncore.dispatcher method
95
  def handle_read(self):
96
    try:
97
      try:
98
        payload, address = self.recvfrom(4096)
99
      except socket.error, err:
100
        if err.errno == errno.EINTR:
101
          # we got a signal while trying to read. no need to do anything,
102
          # handle_read will be called again if there is data on the socket.
103
          return
104
        else:
105
          raise
106
      ip, port = address
107
      self.handle_datagram(payload, ip, port)
108
    except:
109
      # we need to catch any exception here, log it, but proceed, because even
110
      # if we failed handling a single request, we still want to continue.
111
      logging.error("Unexpected exception", exc_info=True)
112

  
113
  def handle_datagram(self, payload, ip, port):
114
    """Handle an already read udp datagram
115

  
116
    """
117
    raise NotImplementedError
118

  
119
  # this method is overriding an asyncore.dispatcher method
120
  def writable(self):
121
    # We should check whether we can write to the socket only if we have
122
    # something scheduled to be written
123
    return bool(self._out_queue)
124

  
125
  def handle_write(self):
126
    try:
127
      if not self._out_queue:
128
        logging.error("handle_write called with empty output queue")
129
        return
130
      (ip, port, payload) = self._out_queue[0]
131
      try:
132
        self.sendto(payload, 0, (ip, port))
133
      except socket.error, err:
134
        if err.errno == errno.EINTR:
135
          # we got a signal while trying to write. no need to do anything,
136
          # handle_write will be called again because we haven't emptied the
137
          # _out_queue, and we'll try again
138
          return
139
        else:
140
          raise
141
      self._out_queue.pop(0)
142
    except:
143
      # we need to catch any exception here, log it, but proceed, because even
144
      # if we failed sending a single datagram we still want to continue.
145
      logging.error("Unexpected exception", exc_info=True)
146

  
147
  def enqueue_send(self, ip, port, payload):
148
    """Enqueue a datagram to be sent when possible
149

  
150
    """
151
    self._out_queue.append((ip, port, payload))
152

  
153

  
74 154
class Mainloop(object):
75 155
  """Generic mainloop for daemons
76 156

  

Also available in: Unified diff