Revision 50a3fbb2
b/daemons/ganeti-masterd | ||
---|---|---|
82 | 82 |
self.queue = jqueue.QueueManager() |
83 | 83 |
self.context = context |
84 | 84 |
self.processors = [] |
85 |
|
|
86 |
# We'll only start threads once we've forked. |
|
87 |
self.jobqueue = None |
|
88 |
|
|
85 | 89 |
signal.signal(signal.SIGINT, self.handle_quit_signals) |
86 | 90 |
signal.signal(signal.SIGTERM, self.handle_quit_signals) |
87 | 91 |
|
92 |
def setup_queue(self): |
|
93 |
self.jobqueue = jqueue.JobQueue(self.context) |
|
94 |
|
|
88 | 95 |
def setup_processors(self): |
89 | 96 |
"""Spawn the processors threads. |
90 | 97 |
|
... | ... | |
140 | 147 |
socket. |
141 | 148 |
|
142 | 149 |
""" |
143 |
self.server_close() |
|
144 |
utils.RemoveFile(constants.MASTER_SOCKET) |
|
145 |
for i in range(self.QUEUE_PROCESSOR_SIZE): |
|
146 |
self.queue.new_queue.put(None) |
|
147 |
for idx, t in enumerate(self.processors): |
|
148 |
logging.debug("waiting for processor thread %s...", idx) |
|
149 |
t.join() |
|
150 |
logging.debug("threads done") |
|
150 |
try: |
|
151 |
self.server_close() |
|
152 |
utils.RemoveFile(constants.MASTER_SOCKET) |
|
153 |
for i in range(self.QUEUE_PROCESSOR_SIZE): |
|
154 |
self.queue.new_queue.put(None) |
|
155 |
for idx, t in enumerate(self.processors): |
|
156 |
logging.debug("waiting for processor thread %s...", idx) |
|
157 |
t.join() |
|
158 |
logging.debug("threads done") |
|
159 |
finally: |
|
160 |
if self.jobqueue: |
|
161 |
self.jobqueue.Shutdown() |
|
151 | 162 |
|
152 | 163 |
|
153 | 164 |
class ClientRqHandler(SocketServer.BaseRequestHandler): |
... | ... | |
419 | 430 |
|
420 | 431 |
try: |
421 | 432 |
master.setup_processors() |
433 |
master.setup_queue() |
|
422 | 434 |
try: |
423 | 435 |
master.serve_forever() |
424 | 436 |
finally: |
Also available in: Unified diff