Revision 23e50d39 daemons/ganeti-masterd
b/daemons/ganeti-masterd | ||
---|---|---|
29 | 29 |
|
30 | 30 |
import sys |
31 | 31 |
import SocketServer |
32 |
import threading |
|
33 | 32 |
import time |
34 | 33 |
import collections |
35 | 34 |
import Queue |
... | ... | |
52 | 51 |
from ganeti import errors |
53 | 52 |
from ganeti import ssconf |
54 | 53 |
from ganeti import logger |
54 |
from ganeti import workerpool |
|
55 | 55 |
|
56 | 56 |
|
57 |
CLIENT_REQUEST_WORKERS = 16 |
|
58 |
|
|
57 | 59 |
EXIT_NOTMASTER = constants.EXIT_NOTMASTER |
58 | 60 |
EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR |
59 | 61 |
|
60 | 62 |
|
63 |
class ClientRequestWorker(workerpool.BaseWorker): |
|
64 |
def RunTask(self, server, request, client_address): |
|
65 |
"""Process the request. |
|
66 |
|
|
67 |
This is copied from the code in ThreadingMixIn. |
|
68 |
|
|
69 |
""" |
|
70 |
try: |
|
71 |
server.finish_request(request, client_address) |
|
72 |
server.close_request(request) |
|
73 |
except: |
|
74 |
server.handle_error(request, client_address) |
|
75 |
server.close_request(request) |
|
76 |
|
|
77 |
|
|
61 | 78 |
class IOServer(SocketServer.UnixStreamServer): |
62 | 79 |
"""IO thread class. |
63 | 80 |
|
... | ... | |
81 | 98 |
|
82 | 99 |
# We'll only start threads once we've forked. |
83 | 100 |
self.jobqueue = None |
101 |
self.request_workers = None |
|
84 | 102 |
|
85 | 103 |
signal.signal(signal.SIGINT, self.handle_quit_signals) |
86 | 104 |
signal.signal(signal.SIGTERM, self.handle_quit_signals) |
87 | 105 |
|
88 | 106 |
def setup_queue(self): |
89 | 107 |
self.jobqueue = jqueue.JobQueue(self.context) |
90 |
|
|
91 |
def process_request_thread(self, request, client_address): |
|
92 |
"""Process the request. |
|
93 |
|
|
94 |
This is copied from the code in ThreadingMixIn. |
|
95 |
|
|
96 |
""" |
|
97 |
try: |
|
98 |
self.finish_request(request, client_address) |
|
99 |
self.close_request(request) |
|
100 |
except: |
|
101 |
self.handle_error(request, client_address) |
|
102 |
self.close_request(request) |
|
108 |
self.request_workers = workerpool.WorkerPool(CLIENT_REQUEST_WORKERS, |
|
109 |
ClientRequestWorker) |
|
103 | 110 |
|
104 | 111 |
def process_request(self, request, client_address): |
105 |
"""Start a new thread to process the request. |
|
106 |
|
|
107 |
This is copied from the coode in ThreadingMixIn. |
|
112 |
"""Add task to workerpool to process request. |
|
108 | 113 |
|
109 | 114 |
""" |
110 |
t = threading.Thread(target=self.process_request_thread, |
|
111 |
args=(request, client_address)) |
|
112 |
t.start() |
|
115 |
self.request_workers.AddTask(self, request, client_address) |
|
113 | 116 |
|
114 | 117 |
def handle_quit_signals(self, signum, frame): |
115 | 118 |
print "received %s in %s" % (signum, frame) |
... | ... | |
132 | 135 |
self.server_close() |
133 | 136 |
utils.RemoveFile(constants.MASTER_SOCKET) |
134 | 137 |
finally: |
138 |
if self.request_workers: |
|
139 |
self.request_workers.Shutdown() |
|
135 | 140 |
if self.jobqueue: |
136 | 141 |
self.jobqueue.Shutdown() |
137 | 142 |
|
Also available in: Unified diff