Revision cdd7f900 daemons/ganeti-masterd
b/daemons/ganeti-masterd | ||
---|---|---|
30 | 30 |
# C0103: Invalid name ganeti-masterd |
31 | 31 |
|
32 | 32 |
import sys |
33 |
import socket |
|
33 | 34 |
import SocketServer |
34 | 35 |
import time |
35 | 36 |
import collections |
36 |
import signal |
|
37 | 37 |
import logging |
38 | 38 |
|
39 | 39 |
from optparse import OptionParser |
... | ... | |
65 | 65 |
def RunTask(self, server, request, client_address): |
66 | 66 |
"""Process the request. |
67 | 67 |
|
68 |
This is copied from the code in ThreadingMixIn. |
|
69 |
|
|
70 | 68 |
""" |
71 | 69 |
try: |
72 |
server.finish_request(request, client_address) |
|
73 |
server.close_request(request) |
|
74 |
except: # pylint: disable-msg=W0702 |
|
75 |
server.handle_error(request, client_address) |
|
76 |
server.close_request(request) |
|
70 |
server.request_handler_class(request, client_address, server) |
|
71 |
finally: |
|
72 |
request.close() |
|
77 | 73 |
|
78 | 74 |
|
79 |
class IOServer(SocketServer.UnixStreamServer):
|
|
80 |
"""IO thread class.
|
|
75 |
class MasterServer(daemon.AsyncStreamServer):
|
|
76 |
"""Master Server.
|
|
81 | 77 |
|
82 |
This class takes care of initializing the other threads, setting |
|
83 |
signal handlers (which are processed only in this thread), and doing |
|
84 |
cleanup at shutdown. |
|
78 |
This is the main asynchronous master server. It handles connections to the |
|
79 |
master socket. |
|
85 | 80 |
|
86 | 81 |
""" |
87 |
def __init__(self, address, rqhandler):
|
|
88 |
"""IOServer constructor
|
|
82 |
def __init__(self, mainloop, address, handler_class):
|
|
83 |
"""MasterServer constructor
|
|
89 | 84 |
|
90 |
@param address: the address to bind this IOServer to |
|
91 |
@param rqhandler: RequestHandler type object |
|
85 |
@type mainloop: ganeti.daemon.Mainloop |
|
86 |
@param mainloop: Mainloop used to poll for I/O events |
|
87 |
@param address: the unix socket address to bind the MasterServer to |
|
88 |
@param handler_class: handler class for the connections |
|
92 | 89 |
|
93 | 90 |
""" |
94 |
SocketServer.UnixStreamServer.__init__(self, address, rqhandler) |
|
91 |
daemon.AsyncStreamServer.__init__(self, socket.AF_UNIX, address) |
|
92 |
self.request_handler_class = handler_class |
|
93 |
self.mainloop = mainloop |
|
95 | 94 |
|
96 | 95 |
# We'll only start threads once we've forked. |
97 | 96 |
self.context = None |
98 | 97 |
self.request_workers = None |
99 | 98 |
|
99 |
def handle_connection(self, connected_socket, client_address): |
|
100 |
self.request_workers.AddTask(self, connected_socket, client_address) |
|
101 |
|
|
100 | 102 |
def setup_queue(self): |
101 | 103 |
self.context = GanetiContext() |
102 | 104 |
self.request_workers = workerpool.WorkerPool("ClientReq", |
103 | 105 |
CLIENT_REQUEST_WORKERS, |
104 | 106 |
ClientRequestWorker) |
105 | 107 |
|
106 |
def process_request(self, request, client_address): |
|
107 |
"""Add task to workerpool to process request. |
|
108 |
|
|
109 |
""" |
|
110 |
(pid, uid, gid) = utils.GetSocketCredentials(request) |
|
111 |
logging.info("Accepted connection from pid=%s, uid=%s, gid=%s", |
|
112 |
pid, uid, gid) |
|
113 |
|
|
114 |
self.request_workers.AddTask(self, request, client_address) |
|
115 |
|
|
116 |
def handle_error(self, request, client_address): |
|
117 |
logging.exception("Error while handling request") |
|
118 |
|
|
119 |
@utils.SignalHandled([signal.SIGINT, signal.SIGTERM]) |
|
120 |
def serve_forever(self, signal_handlers=None): # pylint: disable-msg=W0221 |
|
121 |
"""Handle one request at a time until told to quit.""" |
|
122 |
assert isinstance(signal_handlers, dict) and \ |
|
123 |
len(signal_handlers) > 0, \ |
|
124 |
"Broken SignalHandled decorator" |
|
125 |
# Since we use SignalHandled only once, the resulting dict will map all |
|
126 |
# signals to the same handler. We'll just use the first one. |
|
127 |
sighandler = signal_handlers.values()[0] |
|
128 |
while not sighandler.called: |
|
129 |
self.handle_request() |
|
130 |
|
|
131 | 108 |
def server_cleanup(self): |
132 | 109 |
"""Cleanup the server. |
133 | 110 |
|
... | ... | |
136 | 113 |
|
137 | 114 |
""" |
138 | 115 |
try: |
139 |
self.server_close()
|
|
116 |
self.close() |
|
140 | 117 |
finally: |
141 | 118 |
if self.request_workers: |
142 | 119 |
self.request_workers.TerminateWorkers() |
... | ... | |
528 | 505 |
# concurrent execution. |
529 | 506 |
utils.RemoveFile(constants.MASTER_SOCKET) |
530 | 507 |
|
531 |
master = IOServer(constants.MASTER_SOCKET, ClientRqHandler) |
|
508 |
mainloop = daemon.Mainloop() |
|
509 |
master = MasterServer(mainloop, constants.MASTER_SOCKET, ClientRqHandler) |
|
532 | 510 |
try: |
533 | 511 |
rpc.Init() |
534 | 512 |
try: |
... | ... | |
541 | 519 |
|
542 | 520 |
master.setup_queue() |
543 | 521 |
try: |
544 |
master.serve_forever()
|
|
522 |
mainloop.Run()
|
|
545 | 523 |
finally: |
546 | 524 |
master.server_cleanup() |
547 | 525 |
finally: |
Also available in: Unified diff