Revision bd444a11 logic/dispatcher.py
b/logic/dispatcher.py | ||
---|---|---|
96 | 96 |
|
97 | 97 |
def _init(self): |
98 | 98 |
self.logger.info("Initializing") |
99 |
|
|
99 |
|
|
100 |
# Queue declarations |
|
101 |
prefix = settings.BACKEND_PREFIX_ID.split('-')[0] |
|
102 |
|
|
103 |
QUEUE_GANETI_EVENTS_OP = "%s-events-op" % prefix |
|
104 |
QUEUE_GANETI_EVENTS_NET = "%s-events-net" % prefix |
|
105 |
QUEUE_CRON_CREDITS = "%s-credits" % prefix |
|
106 |
QUEUE_EMAIL = "%s-email" % prefix |
|
107 |
QUEUE_RECONC = "%s-reconciliation" % prefix |
|
108 |
if settings.DEBUG is True: |
|
109 |
QUEUE_DEBUG = "debug" # Debug queue, retrieves all messages |
|
110 |
|
|
111 |
QUEUES = (QUEUE_GANETI_EVENTS_OP, QUEUE_GANETI_EVENTS_NET, |
|
112 |
QUEUE_CRON_CREDITS, QUEUE_EMAIL, QUEUE_RECONC) |
|
113 |
|
|
114 |
if settings.DEBUG is True: |
|
115 |
BINDINGS_DEBUG = [ |
|
116 |
# Queue # Exchange # RouteKey # Handler |
|
117 |
(QUEUE_DEBUG, settings.EXCHANGE_GANETI, '#', 'dummy_proc'), |
|
118 |
(QUEUE_DEBUG, settings.EXCHANGE_CRON, '#', 'dummy_proc'), |
|
119 |
(QUEUE_DEBUG, settings.EXCHANGE_API, '#', 'dummy_proc'), |
|
120 |
] |
|
121 |
QUEUES += (QUEUE_DEBUG,) |
|
122 |
|
|
123 |
# notifications of type "ganeti-op-status" |
|
124 |
DB_HANDLER_KEY_OP ='ganeti.%s.event.op' % prefix |
|
125 |
# notifications of type "ganeti-net-status" |
|
126 |
DB_HANDLER_KEY_NET ='ganeti.%s.event.net' % prefix |
|
127 |
|
|
128 |
BINDINGS = [ |
|
129 |
# Queue # Exchange # RouteKey # Handler |
|
130 |
(QUEUE_GANETI_EVENTS_OP, settings.EXCHANGE_GANETI, DB_HANDLER_KEY_OP, 'update_db'), |
|
131 |
(QUEUE_GANETI_EVENTS_NET, settings.EXCHANGE_GANETI, DB_HANDLER_KEY_NET, 'update_net'), |
|
132 |
(QUEUE_CRON_CREDITS, settings.EXCHANGE_CRON, '*.credits.*', 'update_credits'), |
|
133 |
(QUEUE_EMAIL, settings.EXCHANGE_API, '*.email.*', 'send_email'), |
|
134 |
(QUEUE_EMAIL, settings.EXCHANGE_CRON, '*.email.*', 'send_email'), |
|
135 |
(QUEUE_RECONC, settings.EXCHANGE_CRON, 'reconciliation.*', 'trigger_status_update'), |
|
136 |
] |
|
137 |
|
|
100 | 138 |
# Connect to RabbitMQ |
101 | 139 |
conn = None |
102 | 140 |
while conn == None: |
... | ... | |
118 | 156 |
self.chan.exchange_declare(exchange=exchange, type="topic", |
119 | 157 |
durable=True, auto_delete=False) |
120 | 158 |
|
121 |
for queue in settings.QUEUES:
|
|
159 |
for queue in QUEUES: |
|
122 | 160 |
self.chan.queue_declare(queue=queue, durable=True, |
123 | 161 |
exclusive=False, auto_delete=False) |
124 | 162 |
|
125 |
bindings = settings.BINDINGS
|
|
163 |
bindings = BINDINGS |
|
126 | 164 |
|
127 | 165 |
# Special queue for debugging, should not appear in production |
128 | 166 |
if self.debug and settings.DEBUG: |
129 |
self.chan.queue_declare(queue=settings.QUEUE_DEBUG, durable=True,
|
|
167 |
self.chan.queue_declare(queue=QUEUE_DEBUG, durable=True, |
|
130 | 168 |
exclusive=False, auto_delete=False) |
131 |
bindings += settings.BINDINGS_DEBUG
|
|
169 |
bindings += BINDINGS_DEBUG |
|
132 | 170 |
|
133 | 171 |
# Bind queues to handler methods |
134 | 172 |
for binding in bindings: |
Also available in: Unified diff