Revision 5f3269fc
b/daemons/ganeti-confd | ||
---|---|---|
45 | 45 |
from ganeti.confd.server import ConfdProcessor |
46 | 46 |
|
47 | 47 |
|
48 |
class ConfdAsyncUDPServer(asyncore.dispatcher):
|
|
48 |
class ConfdAsyncUDPServer(daemon.AsyncUDPSocket):
|
|
49 | 49 |
"""The confd udp server, suitable for use with asyncore. |
50 | 50 |
|
51 | 51 |
""" |
... | ... | |
60 | 60 |
@param reader: ConfigReader to use to access the config |
61 | 61 |
|
62 | 62 |
""" |
63 |
asyncore.dispatcher.__init__(self)
|
|
63 |
daemon.AsyncUDPSocket.__init__(self)
|
|
64 | 64 |
self.bind_address = bind_address |
65 | 65 |
self.port = port |
66 | 66 |
self.processor = processor |
67 |
self.out_queue = [] |
|
68 |
self.create_socket(socket.AF_INET, socket.SOCK_DGRAM) |
|
69 | 67 |
self.bind((bind_address, port)) |
70 | 68 |
logging.debug("listening on ('%s':%d)" % (bind_address, port)) |
71 | 69 |
|
72 |
# this method is overriding an asyncore.dispatcher method |
|
73 |
def handle_connect(self): |
|
74 |
# Python thinks that the first udp message from a source qualifies as a |
|
75 |
# "connect" and further ones are part of the same connection. We beg to |
|
76 |
# differ and treat all messages equally. |
|
77 |
pass |
|
78 |
|
|
79 |
# this method is overriding an asyncore.dispatcher method |
|
80 |
def handle_read(self): |
|
81 |
try: |
|
82 |
try: |
|
83 |
payload_in, address = self.recvfrom(4096) |
|
84 |
except socket.error, err: |
|
85 |
if err.errno == errno.EINTR: |
|
86 |
# we got a signal while trying to read. no need to do anything, |
|
87 |
# handle_read will be called again if there is data on the socket. |
|
88 |
return |
|
89 |
else: |
|
90 |
raise |
|
91 |
ip, port = address |
|
92 |
payload_out = self.processor.ExecQuery(payload_in, ip, port) |
|
93 |
if payload_out is not None: |
|
94 |
self.out_queue.append((ip, port, payload_out)) |
|
95 |
except: |
|
96 |
# we need to catch any exception here, log it, but proceed, because even |
|
97 |
# if we failed handling a single request, we still want the confd to |
|
98 |
# continue working. |
|
99 |
logging.error("Unexpected exception", exc_info=True) |
|
100 |
|
|
101 |
# this method is overriding an asyncore.dispatcher method |
|
102 |
def writable(self): |
|
103 |
# Only handle writes if we have something enqueued to write |
|
104 |
if self.out_queue: |
|
105 |
return True |
|
106 |
else: |
|
107 |
return False |
|
108 |
|
|
109 |
def handle_write(self): |
|
110 |
try: |
|
111 |
if not self.out_queue: |
|
112 |
logging.error("handle_write called with empty output queue") |
|
113 |
return |
|
114 |
(ip, port, payload) = self.out_queue[0] |
|
115 |
try: |
|
116 |
self.sendto(payload, 0, (ip, port)) |
|
117 |
except socket.error, err: |
|
118 |
if err.errno == errno.EINTR: |
|
119 |
# we got a signal while trying to write. no need to do anything, |
|
120 |
# handle_write will be called again because we haven't emptied the |
|
121 |
# out_queue, and we'll try again |
|
122 |
return |
|
123 |
else: |
|
124 |
raise |
|
125 |
self.out_queue.pop(0) |
|
126 |
except: |
|
127 |
# we need to catch any exception here, log it, but proceed, because even |
|
128 |
# if we failed handling a single request, we still want the confd to |
|
129 |
# continue working. |
|
130 |
logging.error("Unexpected exception", exc_info=True) |
|
70 |
# this method is overriding a daemon.AsyncUDPSocket method |
|
71 |
def handle_datagram(self, payload_in, ip, port): |
|
72 |
payload_out = self.processor.ExecQuery(payload_in, ip, port) |
|
73 |
if payload_out is not None: |
|
74 |
self.enqueue_send(ip, port, payload_out) |
|
131 | 75 |
|
132 | 76 |
|
133 | 77 |
class ConfdInotifyEventHandler(pyinotify.ProcessEvent): |
b/lib/daemon.py | ||
---|---|---|
30 | 30 |
import logging |
31 | 31 |
import sched |
32 | 32 |
import time |
33 |
import socket |
|
33 | 34 |
|
34 | 35 |
from ganeti import utils |
35 | 36 |
from ganeti import constants |
... | ... | |
71 | 72 |
sched.scheduler.__init__(self, timefunc, AsyncoreDelayFunction) |
72 | 73 |
|
73 | 74 |
|
75 |
class AsyncUDPSocket(asyncore.dispatcher): |
|
76 |
"""An improved asyncore udp socket. |
|
77 |
|
|
78 |
""" |
|
79 |
def __init__(self): |
|
80 |
"""Constructor for AsyncUDPSocket |
|
81 |
|
|
82 |
""" |
|
83 |
asyncore.dispatcher.__init__(self) |
|
84 |
self._out_queue = [] |
|
85 |
self.create_socket(socket.AF_INET, socket.SOCK_DGRAM) |
|
86 |
|
|
87 |
# this method is overriding an asyncore.dispatcher method |
|
88 |
def handle_connect(self): |
|
89 |
# Python thinks that the first udp message from a source qualifies as a |
|
90 |
# "connect" and further ones are part of the same connection. We beg to |
|
91 |
# differ and treat all messages equally. |
|
92 |
pass |
|
93 |
|
|
94 |
# this method is overriding an asyncore.dispatcher method |
|
95 |
def handle_read(self): |
|
96 |
try: |
|
97 |
try: |
|
98 |
payload, address = self.recvfrom(4096) |
|
99 |
except socket.error, err: |
|
100 |
if err.errno == errno.EINTR: |
|
101 |
# we got a signal while trying to read. no need to do anything, |
|
102 |
# handle_read will be called again if there is data on the socket. |
|
103 |
return |
|
104 |
else: |
|
105 |
raise |
|
106 |
ip, port = address |
|
107 |
self.handle_datagram(payload, ip, port) |
|
108 |
except: |
|
109 |
# we need to catch any exception here, log it, but proceed, because even |
|
110 |
# if we failed handling a single request, we still want to continue. |
|
111 |
logging.error("Unexpected exception", exc_info=True) |
|
112 |
|
|
113 |
def handle_datagram(self, payload, ip, port): |
|
114 |
"""Handle an already read udp datagram |
|
115 |
|
|
116 |
""" |
|
117 |
raise NotImplementedError |
|
118 |
|
|
119 |
# this method is overriding an asyncore.dispatcher method |
|
120 |
def writable(self): |
|
121 |
# We should check whether we can write to the socket only if we have |
|
122 |
# something scheduled to be written |
|
123 |
return bool(self._out_queue) |
|
124 |
|
|
125 |
def handle_write(self): |
|
126 |
try: |
|
127 |
if not self._out_queue: |
|
128 |
logging.error("handle_write called with empty output queue") |
|
129 |
return |
|
130 |
(ip, port, payload) = self._out_queue[0] |
|
131 |
try: |
|
132 |
self.sendto(payload, 0, (ip, port)) |
|
133 |
except socket.error, err: |
|
134 |
if err.errno == errno.EINTR: |
|
135 |
# we got a signal while trying to write. no need to do anything, |
|
136 |
# handle_write will be called again because we haven't emptied the |
|
137 |
# _out_queue, and we'll try again |
|
138 |
return |
|
139 |
else: |
|
140 |
raise |
|
141 |
self._out_queue.pop(0) |
|
142 |
except: |
|
143 |
# we need to catch any exception here, log it, but proceed, because even |
|
144 |
# if we failed sending a single datagram we still want to continue. |
|
145 |
logging.error("Unexpected exception", exc_info=True) |
|
146 |
|
|
147 |
def enqueue_send(self, ip, port, payload): |
|
148 |
"""Enqueue a datagram to be sent when possible |
|
149 |
|
|
150 |
""" |
|
151 |
self._out_queue.append((ip, port, payload)) |
|
152 |
|
|
153 |
|
|
74 | 154 |
class Mainloop(object): |
75 | 155 |
"""Generic mainloop for daemons |
76 | 156 |
|
Also available in: Unified diff