Revision 7e5a6e86 daemons/ganeti-masterd
b/daemons/ganeti-masterd | ||
---|---|---|
34 | 34 |
import pwd |
35 | 35 |
import sys |
36 | 36 |
import socket |
37 |
import SocketServer |
|
38 | 37 |
import time |
39 | 38 |
import tempfile |
40 |
import collections |
|
41 | 39 |
import logging |
42 | 40 |
|
43 | 41 |
from optparse import OptionParser |
... | ... | |
66 | 64 |
|
67 | 65 |
class ClientRequestWorker(workerpool.BaseWorker): |
68 | 66 |
# pylint: disable-msg=W0221 |
69 |
def RunTask(self, server, request, client_address):
|
|
67 |
def RunTask(self, server, message, client):
|
|
70 | 68 |
"""Process the request. |
71 | 69 |
|
72 | 70 |
""" |
71 |
client_ops = ClientOps(server) |
|
72 |
|
|
73 | 73 |
try: |
74 |
server.request_handler_class(request, client_address, server) |
|
75 |
finally: |
|
76 |
request.close() |
|
74 |
(method, args) = luxi.ParseRequest(message) |
|
75 |
except luxi.ProtocolError, err: |
|
76 |
logging.error("Protocol Error: %s", err) |
|
77 |
client.close_log() |
|
78 |
return |
|
79 |
|
|
80 |
success = False |
|
81 |
try: |
|
82 |
result = client_ops.handle_request(method, args) |
|
83 |
success = True |
|
84 |
except errors.GenericError, err: |
|
85 |
logging.exception("Unexpected exception") |
|
86 |
success = False |
|
87 |
result = errors.EncodeException(err) |
|
88 |
except: |
|
89 |
logging.exception("Unexpected exception") |
|
90 |
err = sys.exc_info() |
|
91 |
result = "Caught exception: %s" % str(err[1]) |
|
92 |
|
|
93 |
try: |
|
94 |
reply = luxi.FormatResponse(success, result) |
|
95 |
client.send_message(reply) |
|
96 |
# awake the main thread so that it can write out the data. |
|
97 |
server.awaker.signal() |
|
98 |
except: |
|
99 |
logging.exception("Send error") |
|
100 |
client.close_log() |
|
101 |
|
|
102 |
|
|
103 |
class MasterClientHandler(daemon.AsyncTerminatedMessageStream): |
|
104 |
"""Handler for master peers. |
|
105 |
|
|
106 |
""" |
|
107 |
_MAX_UNHANDLED = 1 |
|
108 |
def __init__(self, server, connected_socket, client_address, family): |
|
109 |
daemon.AsyncTerminatedMessageStream.__init__(self, connected_socket, |
|
110 |
client_address, |
|
111 |
constants.LUXI_EOM, |
|
112 |
family, self._MAX_UNHANDLED) |
|
113 |
self.server = server |
|
114 |
|
|
115 |
def handle_message(self, message, _): |
|
116 |
self.server.request_workers.AddTask(self.server, message, self) |
|
77 | 117 |
|
78 | 118 |
|
79 | 119 |
class MasterServer(daemon.AsyncStreamServer): |
... | ... | |
83 | 123 |
master socket. |
84 | 124 |
|
85 | 125 |
""" |
86 |
def __init__(self, mainloop, address, handler_class, uid, gid): |
|
126 |
family = socket.AF_UNIX |
|
127 |
|
|
128 |
def __init__(self, mainloop, address, uid, gid): |
|
87 | 129 |
"""MasterServer constructor |
88 | 130 |
|
89 | 131 |
@type mainloop: ganeti.daemon.Mainloop |
90 | 132 |
@param mainloop: Mainloop used to poll for I/O events |
91 | 133 |
@param address: the unix socket address to bind the MasterServer to |
92 |
@param handler_class: handler class for the connections |
|
93 | 134 |
@param uid: The uid of the owner of the socket |
94 | 135 |
@param gid: The gid of the owner of the socket |
95 | 136 |
|
96 | 137 |
""" |
97 | 138 |
temp_name = tempfile.mktemp(dir=os.path.dirname(address)) |
98 |
daemon.AsyncStreamServer.__init__(self, socket.AF_UNIX, temp_name)
|
|
139 |
daemon.AsyncStreamServer.__init__(self, self.family, temp_name)
|
|
99 | 140 |
os.chmod(temp_name, 0770) |
100 | 141 |
os.chown(temp_name, uid, gid) |
101 | 142 |
os.rename(temp_name, address) |
102 | 143 |
|
103 |
self.request_handler_class = handler_class |
|
104 | 144 |
self.mainloop = mainloop |
145 |
self.awaker = daemon.AsyncAwaker() |
|
105 | 146 |
|
106 | 147 |
# We'll only start threads once we've forked. |
107 | 148 |
self.context = None |
108 | 149 |
self.request_workers = None |
109 | 150 |
|
110 | 151 |
def handle_connection(self, connected_socket, client_address): |
111 |
self.request_workers.AddTask(self, connected_socket, client_address) |
|
152 |
# TODO: add connection count and limit the number of open connections to a |
|
153 |
# maximum number to avoid breaking for lack of file descriptors or memory. |
|
154 |
MasterClientHandler(self, connected_socket, client_address, self.family) |
|
112 | 155 |
|
113 | 156 |
def setup_queue(self): |
114 | 157 |
self.context = GanetiContext() |
... | ... | |
132 | 175 |
self.context.jobqueue.Shutdown() |
133 | 176 |
|
134 | 177 |
|
135 |
class ClientRqHandler(SocketServer.BaseRequestHandler): |
|
136 |
"""Client handler""" |
|
137 |
READ_SIZE = 4096 |
|
138 |
|
|
139 |
def setup(self): |
|
140 |
# pylint: disable-msg=W0201 |
|
141 |
# setup() is the api for initialising for this class |
|
142 |
self._buffer = "" |
|
143 |
self._msgs = collections.deque() |
|
144 |
self._ops = ClientOps(self.server) |
|
145 |
|
|
146 |
def handle(self): |
|
147 |
while True: |
|
148 |
msg = self.read_message() |
|
149 |
if msg is None: |
|
150 |
logging.debug("client closed connection") |
|
151 |
break |
|
152 |
|
|
153 |
(method, args) = luxi.ParseRequest(msg) |
|
154 |
|
|
155 |
success = False |
|
156 |
try: |
|
157 |
result = self._ops.handle_request(method, args) |
|
158 |
success = True |
|
159 |
except errors.GenericError, err: |
|
160 |
logging.exception("Unexpected exception") |
|
161 |
result = errors.EncodeException(err) |
|
162 |
except: |
|
163 |
logging.exception("Unexpected exception") |
|
164 |
result = "Caught exception: %s" % str(sys.exc_info()[1]) |
|
165 |
|
|
166 |
self.send_message(luxi.FormatResponse(success, result)) |
|
167 |
|
|
168 |
def read_message(self): |
|
169 |
while not self._msgs: |
|
170 |
data = self.request.recv(self.READ_SIZE) |
|
171 |
if not data: |
|
172 |
return None |
|
173 |
new_msgs = (self._buffer + data).split(constants.LUXI_EOM) |
|
174 |
self._buffer = new_msgs.pop() |
|
175 |
self._msgs.extend(new_msgs) |
|
176 |
return self._msgs.popleft() |
|
177 |
|
|
178 |
def send_message(self, msg): |
|
179 |
# TODO: sendall is not guaranteed to send everything |
|
180 |
self.request.sendall(msg + constants.LUXI_EOM) |
|
181 |
|
|
182 |
|
|
183 | 178 |
class ClientOps: |
184 | 179 |
"""Class holding high-level client operations.""" |
185 | 180 |
def __init__(self, server): |
... | ... | |
526 | 521 |
utils.RemoveFile(constants.MASTER_SOCKET) |
527 | 522 |
|
528 | 523 |
mainloop = daemon.Mainloop() |
529 |
master = MasterServer(mainloop, constants.MASTER_SOCKET, ClientRqHandler,
|
|
524 |
master = MasterServer(mainloop, constants.MASTER_SOCKET, |
|
530 | 525 |
options.uid, options.gid) |
531 | 526 |
try: |
532 | 527 |
rpc.Init() |
Also available in: Unified diff