Revision 2bf8d695
b/logic/dispatcher.py | ||
---|---|---|
70 | 70 |
# Queue bindings to exchanges |
71 | 71 |
BINDINGS = [] |
72 | 72 |
|
73 |
|
|
73 | 74 |
class Dispatcher: |
74 | 75 |
|
75 | 76 |
logger = None |
... | ... | |
77 | 78 |
debug = False |
78 | 79 |
clienttags = [] |
79 | 80 |
|
80 |
def __init__(self, debug = False):
|
|
81 |
def __init__(self, debug=False):
|
|
81 | 82 |
|
82 | 83 |
# Initialize logger |
83 | 84 |
self.logger = log.get_logger('synnefo.dispatcher') |
... | ... | |
119 | 120 |
password=settings.RABBIT_PASSWORD, |
120 | 121 |
virtual_host=settings.RABBIT_VHOST) |
121 | 122 |
except socket.error: |
122 |
self.logger.error("Failed to connect to %s, retrying in 10s...",
|
|
123 |
self.logger.error("Failed to connect to %s, retrying in 10s", |
|
123 | 124 |
settings.RABBIT_HOST) |
124 | 125 |
time.sleep(10) |
125 | 126 |
|
... | ... | |
173 | 174 |
QUEUE_GANETI_BUILD_PROGR) |
174 | 175 |
|
175 | 176 |
# notifications of type "ganeti-op-status" |
176 |
DB_HANDLER_KEY_OP ='ganeti.%s.event.op' % prefix |
|
177 |
DB_HANDLER_KEY_OP = 'ganeti.%s.event.op' % prefix
|
|
177 | 178 |
# notifications of type "ganeti-net-status" |
178 |
DB_HANDLER_KEY_NET ='ganeti.%s.event.net' % prefix |
|
179 |
DB_HANDLER_KEY_NET = 'ganeti.%s.event.net' % prefix
|
|
179 | 180 |
# notifications of type "ganeti-create-progress" |
180 | 181 |
BUILD_MONITOR_HANDLER = 'ganeti.%s.event.progress' % prefix |
181 | 182 |
|
182 | 183 |
BINDINGS = [ |
183 |
# Queue # Exchange # RouteKey # Handler |
|
184 |
(QUEUE_GANETI_EVENTS_OP, settings.EXCHANGE_GANETI, DB_HANDLER_KEY_OP, 'update_db'), |
|
185 |
(QUEUE_GANETI_EVENTS_NET, settings.EXCHANGE_GANETI, DB_HANDLER_KEY_NET, 'update_net'), |
|
186 |
(QUEUE_GANETI_BUILD_PROGR,settings.EXCHANGE_GANETI, BUILD_MONITOR_HANDLER, 'update_build_progress'), |
|
187 |
(QUEUE_CRON_CREDITS, settings.EXCHANGE_CRON, '*.credits.*', 'update_credits'), |
|
188 |
(QUEUE_EMAIL, settings.EXCHANGE_API, '*.email.*', 'send_email'), |
|
189 |
(QUEUE_EMAIL, settings.EXCHANGE_CRON, '*.email.*', 'send_email'), |
|
190 |
(QUEUE_RECONC, settings.EXCHANGE_CRON, 'reconciliation.*', 'trigger_status_update'), |
|
184 |
# Queue # Exchange # RouteKey |
|
185 |
# Handler |
|
186 |
(QUEUE_GANETI_EVENTS_OP, settings.EXCHANGE_GANETI, DB_HANDLER_KEY_OP, |
|
187 |
'update_db'), |
|
188 |
(QUEUE_GANETI_EVENTS_NET, settings.EXCHANGE_GANETI, DB_HANDLER_KEY_NET, |
|
189 |
'update_net'), |
|
190 |
(QUEUE_GANETI_BUILD_PROGR, settings.EXCHANGE_GANETI, BUILD_MONITOR_HANDLER, |
|
191 |
'update_build_progress'), |
|
192 |
(QUEUE_CRON_CREDITS, settings.EXCHANGE_CRON, '*.credits.*', |
|
193 |
'update_credits'), |
|
194 |
(QUEUE_EMAIL, settings.EXCHANGE_API, '*.email.*', |
|
195 |
'send_email'), |
|
196 |
(QUEUE_EMAIL, settings.EXCHANGE_CRON, '*.email.*', |
|
197 |
'send_email'), |
|
198 |
(QUEUE_RECONC, settings.EXCHANGE_CRON, 'reconciliation.*', |
|
199 |
'trigger_status_update'), |
|
191 | 200 |
] |
192 | 201 |
|
193 | 202 |
if settings.DEBUG is True: |
... | ... | |
323 | 332 |
print "Queue not bound to any exchange: %s" % queue |
324 | 333 |
return |
325 | 334 |
|
326 |
chan.queue_bind(queue=queue, exchange=exch,routing_key='#') |
|
335 |
chan.queue_bind(queue=queue, exchange=exch, routing_key='#')
|
|
327 | 336 |
tag = chan.basic_consume(queue=queue, callback=callbacks.dummy_proc) |
328 | 337 |
|
329 | 338 |
print "Queue draining about to start, hit Ctrl+c when done" |
... | ... | |
362 | 371 |
|
363 | 372 |
|
364 | 373 |
def debug_mode(): |
365 |
disp = Dispatcher(debug = True)
|
|
374 |
disp = Dispatcher(debug=True)
|
|
366 | 375 |
signal(SIGINT, _exit_handler) |
367 | 376 |
signal(SIGTERM, _exit_handler) |
368 | 377 |
|
Also available in: Unified diff