Revision 698d0666 logic/dispatcher.py
b/logic/dispatcher.py | ||
---|---|---|
62 | 62 |
|
63 | 63 |
from synnefo.logic import callbacks |
64 | 64 |
|
65 |
# Queue names |
|
66 |
QUEUES = [] |
|
67 |
|
|
68 |
# Queue bindings to exchanges |
|
69 |
BINDINGS = [] |
|
70 |
|
|
65 | 71 |
class Dispatcher: |
66 | 72 |
|
67 | 73 |
logger = None |
... | ... | |
95 | 101 |
self.chan.close() |
96 | 102 |
|
97 | 103 |
def _init(self): |
104 |
global QUEUES, BINDINGS |
|
98 | 105 |
self.logger.info("Initializing") |
99 | 106 |
|
100 |
# Queue declarations |
|
101 |
prefix = settings.BACKEND_PREFIX_ID.split('-')[0] |
|
102 |
|
|
103 |
QUEUE_GANETI_EVENTS_OP = "%s-events-op" % prefix |
|
104 |
QUEUE_GANETI_EVENTS_NET = "%s-events-net" % prefix |
|
105 |
QUEUE_CRON_CREDITS = "%s-credits" % prefix |
|
106 |
QUEUE_EMAIL = "%s-email" % prefix |
|
107 |
QUEUE_RECONC = "%s-reconciliation" % prefix |
|
108 |
if settings.DEBUG is True: |
|
109 |
QUEUE_DEBUG = "debug" # Debug queue, retrieves all messages |
|
110 |
|
|
111 |
QUEUES = (QUEUE_GANETI_EVENTS_OP, QUEUE_GANETI_EVENTS_NET, |
|
112 |
QUEUE_CRON_CREDITS, QUEUE_EMAIL, QUEUE_RECONC) |
|
113 |
|
|
114 |
if settings.DEBUG is True: |
|
115 |
BINDINGS_DEBUG = [ |
|
116 |
# Queue # Exchange # RouteKey # Handler |
|
117 |
(QUEUE_DEBUG, settings.EXCHANGE_GANETI, '#', 'dummy_proc'), |
|
118 |
(QUEUE_DEBUG, settings.EXCHANGE_CRON, '#', 'dummy_proc'), |
|
119 |
(QUEUE_DEBUG, settings.EXCHANGE_API, '#', 'dummy_proc'), |
|
120 |
] |
|
121 |
QUEUES += (QUEUE_DEBUG,) |
|
122 |
|
|
123 |
# notifications of type "ganeti-op-status" |
|
124 |
DB_HANDLER_KEY_OP ='ganeti.%s.event.op' % prefix |
|
125 |
# notifications of type "ganeti-net-status" |
|
126 |
DB_HANDLER_KEY_NET ='ganeti.%s.event.net' % prefix |
|
127 |
|
|
128 |
BINDINGS = [ |
|
129 |
# Queue # Exchange # RouteKey # Handler |
|
130 |
(QUEUE_GANETI_EVENTS_OP, settings.EXCHANGE_GANETI, DB_HANDLER_KEY_OP, 'update_db'), |
|
131 |
(QUEUE_GANETI_EVENTS_NET, settings.EXCHANGE_GANETI, DB_HANDLER_KEY_NET, 'update_net'), |
|
132 |
(QUEUE_CRON_CREDITS, settings.EXCHANGE_CRON, '*.credits.*', 'update_credits'), |
|
133 |
(QUEUE_EMAIL, settings.EXCHANGE_API, '*.email.*', 'send_email'), |
|
134 |
(QUEUE_EMAIL, settings.EXCHANGE_CRON, '*.email.*', 'send_email'), |
|
135 |
(QUEUE_RECONC, settings.EXCHANGE_CRON, 'reconciliation.*', 'trigger_status_update'), |
|
136 |
] |
|
137 |
|
|
138 | 107 |
# Connect to RabbitMQ |
139 | 108 |
conn = None |
140 | 109 |
while conn == None: |
... | ... | |
162 | 131 |
|
163 | 132 |
bindings = BINDINGS |
164 | 133 |
|
165 |
# Special queue for debugging, should not appear in production |
|
166 |
if self.debug and settings.DEBUG: |
|
167 |
self.chan.queue_declare(queue=QUEUE_DEBUG, durable=True, |
|
168 |
exclusive=False, auto_delete=False) |
|
169 |
bindings += BINDINGS_DEBUG |
|
170 |
|
|
171 | 134 |
# Bind queues to handler methods |
172 | 135 |
for binding in bindings: |
173 | 136 |
try: |
... | ... | |
184 | 147 |
self.clienttags.append(tag) |
185 | 148 |
|
186 | 149 |
|
150 |
def _init_queues(): |
|
151 |
global QUEUES, BINDINGS |
|
152 |
|
|
153 |
# Queue declarations |
|
154 |
prefix = settings.BACKEND_PREFIX_ID.split('-')[0] |
|
155 |
|
|
156 |
QUEUE_GANETI_EVENTS_OP = "%s-events-op" % prefix |
|
157 |
QUEUE_GANETI_EVENTS_NET = "%s-events-net" % prefix |
|
158 |
QUEUE_CRON_CREDITS = "%s-credits" % prefix |
|
159 |
QUEUE_EMAIL = "%s-email" % prefix |
|
160 |
QUEUE_RECONC = "%s-reconciliation" % prefix |
|
161 |
if settings.DEBUG is True: |
|
162 |
QUEUE_DEBUG = "debug" # Debug queue, retrieves all messages |
|
163 |
|
|
164 |
QUEUES = (QUEUE_GANETI_EVENTS_OP, QUEUE_GANETI_EVENTS_NET, |
|
165 |
QUEUE_CRON_CREDITS, QUEUE_EMAIL, QUEUE_RECONC) |
|
166 |
|
|
167 |
# notifications of type "ganeti-op-status" |
|
168 |
DB_HANDLER_KEY_OP ='ganeti.%s.event.op' % prefix |
|
169 |
# notifications of type "ganeti-net-status" |
|
170 |
DB_HANDLER_KEY_NET ='ganeti.%s.event.net' % prefix |
|
171 |
|
|
172 |
BINDINGS = [ |
|
173 |
# Queue # Exchange # RouteKey # Handler |
|
174 |
(QUEUE_GANETI_EVENTS_OP, settings.EXCHANGE_GANETI, DB_HANDLER_KEY_OP, 'update_db'), |
|
175 |
(QUEUE_GANETI_EVENTS_NET, settings.EXCHANGE_GANETI, DB_HANDLER_KEY_NET, 'update_net'), |
|
176 |
(QUEUE_CRON_CREDITS, settings.EXCHANGE_CRON, '*.credits.*', 'update_credits'), |
|
177 |
(QUEUE_EMAIL, settings.EXCHANGE_API, '*.email.*', 'send_email'), |
|
178 |
(QUEUE_EMAIL, settings.EXCHANGE_CRON, '*.email.*', 'send_email'), |
|
179 |
(QUEUE_RECONC, settings.EXCHANGE_CRON, 'reconciliation.*', 'trigger_status_update'), |
|
180 |
] |
|
181 |
|
|
182 |
if settings.DEBUG is True: |
|
183 |
BINDINGS += [ |
|
184 |
# Queue # Exchange # RouteKey # Handler |
|
185 |
(QUEUE_DEBUG, settings.EXCHANGE_GANETI, '#', 'dummy_proc'), |
|
186 |
(QUEUE_DEBUG, settings.EXCHANGE_CRON, '#', 'dummy_proc'), |
|
187 |
(QUEUE_DEBUG, settings.EXCHANGE_API, '#', 'dummy_proc'), |
|
188 |
] |
|
189 |
QUEUES += (QUEUE_DEBUG,) |
|
190 |
|
|
191 |
|
|
187 | 192 |
def _exit_handler(signum, frame): |
188 | 193 |
""""Catch exit signal in children processes.""" |
189 | 194 |
print "%d: Caught signal %d, will raise SystemExit" % (os.getpid(), signum) |
... | ... | |
237 | 242 |
""" |
238 | 243 |
Delete declared queues from RabbitMQ. Use with care! |
239 | 244 |
""" |
245 |
global QUEUES, BINDINGS |
|
240 | 246 |
conn = get_connection() |
241 | 247 |
chan = conn.channel() |
242 | 248 |
|
243 |
print "Queues to be deleted: ", settings.QUEUES
|
|
249 |
print "Queues to be deleted: ", QUEUES |
|
244 | 250 |
|
245 | 251 |
if not get_user_confirmation(): |
246 | 252 |
return |
247 | 253 |
|
248 |
for queue in settings.QUEUES:
|
|
254 |
for queue in QUEUES: |
|
249 | 255 |
try: |
250 | 256 |
chan.queue_delete(queue=queue) |
251 | 257 |
print "Deleting queue %s" % queue |
... | ... | |
260 | 266 |
""" |
261 | 267 |
Delete declared exchanges from RabbitMQ, after removing all queues first |
262 | 268 |
""" |
269 |
global QUEUES, BINDINGS |
|
263 | 270 |
purge_queues() |
264 | 271 |
|
265 | 272 |
conn = get_connection() |
... | ... | |
283 | 290 |
""" |
284 | 291 |
Strip a (declared) queue from all outstanding messages |
285 | 292 |
""" |
293 |
global QUEUES, BINDINGS |
|
286 | 294 |
if not queue: |
287 | 295 |
return |
288 | 296 |
|
289 |
if not queue in settings.QUEUES:
|
|
297 |
if not queue in QUEUES: |
|
290 | 298 |
print "Queue %s not configured" % queue |
291 | 299 |
return |
292 | 300 |
|
... | ... | |
298 | 306 |
chan = conn.channel() |
299 | 307 |
|
300 | 308 |
# Register a temporary queue binding |
301 |
for binding in settings.BINDINGS:
|
|
309 |
for binding in BINDINGS: |
|
302 | 310 |
if binding[0] == queue: |
303 | 311 |
exch = binding[1] |
304 | 312 |
|
... | ... | |
358 | 366 |
|
359 | 367 |
logger = log.get_logger("synnefo.dispatcher") |
360 | 368 |
|
369 |
# Init the global variables containing the queues |
|
370 |
_init_queues() |
|
371 |
|
|
361 | 372 |
# Special case for the clean up queues action |
362 | 373 |
if opts.purge_queues: |
363 | 374 |
purge_queues() |
Also available in: Unified diff