Revision 659de616
b/snf-cyclades-app/synnefo/app_settings/default/queues.py | ||
---|---|---|
9 | 9 |
AMQP_BACKEND = 'puka' |
10 | 10 |
|
11 | 11 |
EXCHANGE_GANETI = "ganeti" # Messages from Ganeti |
12 |
EXCHANGE_CRON = "cron" # Messages from periodically triggered tasks |
|
13 |
EXCHANGE_API = "api" # Messages from the REST API |
|
14 |
EXCHANGES = (EXCHANGE_GANETI, EXCHANGE_CRON, EXCHANGE_API) |
b/snf-cyclades-app/synnefo/logic/dispatcher.py | ||
---|---|---|
62 | 62 |
|
63 | 63 |
from synnefo.lib.amqp import AMQPClient |
64 | 64 |
from synnefo.logic import callbacks |
65 |
from synnefo.logic import queues |
|
65 | 66 |
|
66 | 67 |
import logging |
67 | 68 |
log = logging.getLogger() |
68 | 69 |
|
69 |
# Queue names |
|
70 |
QUEUES = [] |
|
71 |
|
|
72 |
# Queue bindings to exchanges |
|
73 |
BINDINGS = [] |
|
74 |
|
|
75 | 70 |
|
76 | 71 |
class Dispatcher: |
77 | 72 |
debug = False |
... | ... | |
106 | 101 |
self.client.close() |
107 | 102 |
|
108 | 103 |
def _init(self): |
109 |
global QUEUES, BINDINGS |
|
110 | 104 |
log.info("Initializing") |
111 | 105 |
|
112 | 106 |
self.client = AMQPClient() |
... | ... | |
114 | 108 |
self.client.connect() |
115 | 109 |
|
116 | 110 |
# Declare queues and exchanges |
117 |
for exchange in settings.EXCHANGES: |
|
118 |
self.client.exchange_declare(exchange=exchange, |
|
119 |
type="topic") |
|
120 |
|
|
121 |
for queue in QUEUES: |
|
111 |
exchange = settings.EXCHANGE_GANETI |
|
112 |
exchange_dl = queues.convert_exchange_to_dead(exchange) |
|
113 |
self.client.exchange_declare(exchange=exchange, |
|
114 |
type="topic") |
|
115 |
self.client.exchange_declare(exchange=exchange_dl, |
|
116 |
type="topic") |
|
117 |
|
|
118 |
for queue in queues.QUEUES: |
|
122 | 119 |
# Queues are mirrored to all RabbitMQ brokers |
123 |
self.client.queue_declare(queue=queue, mirrored=True) |
|
124 |
|
|
125 |
bindings = BINDINGS |
|
120 |
self.client.queue_declare(queue=queue, mirrored=True, |
|
121 |
dead_letter_exchange=exchange_dl) |
|
122 |
# Declare the corresponding dead-letter queue |
|
123 |
queue_dl = queues.convert_queue_to_dead(queue) |
|
124 |
self.client.queue_declare(queue=queue_dl, mirrored=True) |
|
126 | 125 |
|
127 | 126 |
# Bind queues to handler methods |
128 |
for binding in bindings:
|
|
127 |
for binding in queues.BINDINGS:
|
|
129 | 128 |
try: |
130 | 129 |
callback = getattr(callbacks, binding[3]) |
131 | 130 |
except AttributeError: |
132 | 131 |
log.error("Cannot find callback %s", binding[3]) |
133 | 132 |
raise SystemExit(1) |
133 |
queue = binding[0] |
|
134 |
exchange = binding[1] |
|
135 |
routing_key = binding[2] |
|
134 | 136 |
|
135 |
self.client.queue_bind(queue=binding[0], exchange=binding[1],
|
|
136 |
routing_key=binding[2])
|
|
137 |
self.client.queue_bind(queue=queue, exchange=exchange,
|
|
138 |
routing_key=routing_key)
|
|
137 | 139 |
|
138 | 140 |
self.client.basic_consume(queue=binding[0], |
139 | 141 |
callback=callback, |
140 | 142 |
prefetch_count=5) |
141 | 143 |
|
144 |
queue_dl = queues.convert_queue_to_dead(queue) |
|
145 |
exchange_dl = queues.convert_exchange_to_dead(exchange) |
|
146 |
# Bind the corresponding dead-letter queue |
|
147 |
self.client.queue_bind(queue=queue_dl, |
|
148 |
exchange=exchange_dl, |
|
149 |
routing_key=routing_key) |
|
150 |
|
|
142 | 151 |
log.debug("Binding %s(%s) to queue %s with handler %s", |
143 |
binding[1], binding[2], binding[0], binding[3]) |
|
144 |
|
|
145 |
|
|
146 |
def _init_queues(): |
|
147 |
global QUEUES, BINDINGS |
|
148 |
|
|
149 |
# Queue declarations |
|
150 |
prefix = settings.BACKEND_PREFIX_ID.split('-')[0] |
|
151 |
|
|
152 |
QUEUE_GANETI_EVENTS_OP = "%s-events-op" % prefix |
|
153 |
QUEUE_GANETI_EVENTS_NETWORK = "%s-events-network" % prefix |
|
154 |
QUEUE_GANETI_EVENTS_NET = "%s-events-net" % prefix |
|
155 |
QUEUE_GANETI_BUILD_PROGR = "%s-events-progress" % prefix |
|
156 |
QUEUE_RECONC = "%s-reconciliation" % prefix |
|
157 |
if settings.DEBUG is True: |
|
158 |
QUEUE_DEBUG = "%s-debug" % prefix # Debug queue, retrieves all messages |
|
159 |
|
|
160 |
QUEUES = (QUEUE_GANETI_EVENTS_OP, QUEUE_GANETI_EVENTS_NETWORK, QUEUE_GANETI_EVENTS_NET, QUEUE_RECONC, |
|
161 |
QUEUE_GANETI_BUILD_PROGR) |
|
162 |
|
|
163 |
# notifications of type "ganeti-op-status" |
|
164 |
DB_HANDLER_KEY_OP = 'ganeti.%s.event.op' % prefix |
|
165 |
# notifications of type "ganeti-network-status" |
|
166 |
DB_HANDLER_KEY_NETWORK = 'ganeti.%s.event.network' % prefix |
|
167 |
# notifications of type "ganeti-net-status" |
|
168 |
DB_HANDLER_KEY_NET = 'ganeti.%s.event.net' % prefix |
|
169 |
# notifications of type "ganeti-create-progress" |
|
170 |
BUILD_MONITOR_HANDLER = 'ganeti.%s.event.progress' % prefix |
|
171 |
# reconciliation |
|
172 |
RECONC_HANDLER = 'reconciliation.%s.*' % prefix |
|
173 |
|
|
174 |
BINDINGS = [ |
|
175 |
# Queue # Exchange # RouteKey # Handler |
|
176 |
(QUEUE_GANETI_EVENTS_OP, settings.EXCHANGE_GANETI, DB_HANDLER_KEY_OP, 'update_db'), |
|
177 |
(QUEUE_GANETI_EVENTS_NETWORK, settings.EXCHANGE_GANETI, DB_HANDLER_KEY_NETWORK, 'update_network'), |
|
178 |
(QUEUE_GANETI_EVENTS_NET, settings.EXCHANGE_GANETI, DB_HANDLER_KEY_NET, 'update_net'), |
|
179 |
(QUEUE_GANETI_BUILD_PROGR, settings.EXCHANGE_GANETI, BUILD_MONITOR_HANDLER, 'update_build_progress'), |
|
180 |
] |
|
181 |
|
|
182 |
if settings.DEBUG is True: |
|
183 |
BINDINGS += [ |
|
184 |
# Queue # Exchange # RouteKey # Handler |
|
185 |
(QUEUE_DEBUG, settings.EXCHANGE_GANETI, '#', 'dummy_proc'), |
|
186 |
(QUEUE_DEBUG, settings.EXCHANGE_CRON, '#', 'dummy_proc'), |
|
187 |
(QUEUE_DEBUG, settings.EXCHANGE_API, '#', 'dummy_proc'), |
|
188 |
] |
|
189 |
QUEUES += (QUEUE_DEBUG,) |
|
152 |
exchange, routing_key, queue, binding[3]) |
|
190 | 153 |
|
191 | 154 |
|
192 | 155 |
def parse_arguments(args): |
... | ... | |
219 | 182 |
""" |
220 | 183 |
Delete declared queues from RabbitMQ. Use with care! |
221 | 184 |
""" |
222 |
global QUEUES, BINDINGS |
|
223 | 185 |
client = AMQPClient(max_retries=120) |
224 | 186 |
client.connect() |
225 | 187 |
|
226 |
print "Queues to be deleted: ", QUEUES |
|
188 |
print "Queues to be deleted: ", queues.QUEUES
|
|
227 | 189 |
|
228 | 190 |
if not get_user_confirmation(): |
229 | 191 |
return |
230 | 192 |
|
231 |
for queue in QUEUES: |
|
193 |
for queue in queues.QUEUES:
|
|
232 | 194 |
result = client.queue_delete(queue=queue) |
233 | 195 |
print "Deleting queue %s. Result: %s" % (queue, result) |
234 | 196 |
|
... | ... | |
237 | 199 |
|
238 | 200 |
def purge_exchanges(): |
239 | 201 |
"""Delete declared exchanges from RabbitMQ, after removing all queues""" |
240 |
global QUEUES, BINDINGS |
|
241 | 202 |
purge_queues() |
242 | 203 |
|
243 | 204 |
client = AMQPClient() |
244 | 205 |
client.connect() |
245 | 206 |
|
246 |
print "Exchanges to be deleted: ", settings.EXCHANGES |
|
207 |
exchanges = queues.EXCHANGES |
|
208 |
print "Exchanges to be deleted: ", exchanges |
|
247 | 209 |
|
248 | 210 |
if not get_user_confirmation(): |
249 | 211 |
return |
250 | 212 |
|
251 |
for exchange in settings.EXCHANGES: |
|
252 |
result = client.exchange_delete(exchange=exchange) |
|
253 |
print "Deleting exchange %s. Result: %s" % (exchange, result) |
|
254 |
|
|
213 |
for exch in exchanges: |
|
214 |
result = client.exchange_delete(exchange=exch) |
|
215 |
print "Deleting exchange %s. Result: %s" % (exch, result) |
|
255 | 216 |
client.close() |
256 | 217 |
|
257 | 218 |
|
258 | 219 |
def drain_queue(queue): |
259 | 220 |
"""Strip a (declared) queue from all outstanding messages""" |
260 |
global QUEUES, BINDINGS |
|
261 | 221 |
if not queue: |
262 | 222 |
return |
263 | 223 |
|
264 |
if not queue in QUEUES: |
|
224 |
if not queue in queues.QUEUES:
|
|
265 | 225 |
print "Queue %s not configured" % queue |
266 | 226 |
return |
267 | 227 |
|
... | ... | |
336 | 296 |
setproctitle.setproctitle(sys.argv[0]) |
337 | 297 |
setup_logging(opts) |
338 | 298 |
|
339 |
# Init the global variables containing the queues |
|
340 |
_init_queues() |
|
341 |
|
|
342 | 299 |
# Special case for the clean up queues action |
343 | 300 |
if opts.purge_queues: |
344 | 301 |
purge_queues() |
b/snf-cyclades-app/synnefo/logic/queues.py | ||
---|---|---|
1 |
# Copyright 2012 GRNET S.A. All rights reserved. |
|
2 |
# |
|
3 |
# Redistribution and use in source and binary forms, with or without |
|
4 |
# modification, are permitted provided that the following conditions |
|
5 |
# are met: |
|
6 |
# |
|
7 |
# 1. Redistributions of source code must retain the above copyright |
|
8 |
# notice, this list of conditions and the following disclaimer. |
|
9 |
# |
|
10 |
# 2. Redistributions in binary form must reproduce the above copyright |
|
11 |
# notice, this list of conditions and the following disclaimer in the |
|
12 |
# documentation and/or other materials provided with the distribution. |
|
13 |
# |
|
14 |
# THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND |
|
15 |
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE |
|
16 |
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE |
|
17 |
# ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE |
|
18 |
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL |
|
19 |
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS |
|
20 |
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) |
|
21 |
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT |
|
22 |
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY |
|
23 |
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF |
|
24 |
# SUCH DAMAGE. |
|
25 |
# |
|
26 |
# The views and conclusions contained in the software and documentation are |
|
27 |
# those of the authors and should not be interpreted as representing official |
|
28 |
# policies, either expressed or implied, of GRNET S.A. |
|
29 |
|
|
30 |
|
|
31 |
from synnefo.settings import BACKEND_PREFIX_ID, DEBUG, EXCHANGE_GANETI |
|
32 |
|
|
33 |
try: |
|
34 |
prefix = BACKEND_PREFIX_ID.split('-')[0] |
|
35 |
except TypeError, IndexError: |
|
36 |
raise Exception("Invalid BACKEND_PREFIX_ID") |
|
37 |
|
|
38 |
# EXCHANGES |
|
39 |
EXCHANGES = (EXCHANGE_GANETI,) |
|
40 |
|
|
41 |
|
|
42 |
# QUEUES |
|
43 |
QUEUE_OP = "%s-events-op" % prefix |
|
44 |
QUEUE_NETWORK = "%s-events-network" % prefix |
|
45 |
QUEUE_NET = "%s-events-net" % prefix |
|
46 |
QUEUE_PROGRESS = "%s-events-progress" % prefix |
|
47 |
|
|
48 |
|
|
49 |
QUEUES = (QUEUE_OP, |
|
50 |
QUEUE_NETWORK, |
|
51 |
QUEUE_NET, |
|
52 |
QUEUE_PROGRESS) |
|
53 |
|
|
54 |
# ROUTING KEYS |
|
55 |
# notifications of type "ganeti-op-status" |
|
56 |
KEY_OP = 'ganeti.%s.event.op' % prefix |
|
57 |
# notifications of type "ganeti-network-status" |
|
58 |
KEY_NETWORK = 'ganeti.%s.event.network' % prefix |
|
59 |
# notifications of type "ganeti-net-status" |
|
60 |
KEY_NET = 'ganeti.%s.event.net' % prefix |
|
61 |
# notifications of type "ganeti-create-progress" |
|
62 |
KEY_PROGRESS = 'ganeti.%s.event.progress' % prefix |
|
63 |
|
|
64 |
# BINDINGS: |
|
65 |
BINDINGS = ( |
|
66 |
# Queue # Exchange # RouteKey # Handler |
|
67 |
(QUEUE_OP, EXCHANGE_GANETI, KEY_OP, 'update_db'), |
|
68 |
(QUEUE_NETWORK, EXCHANGE_GANETI, KEY_NETWORK, 'update_network'), |
|
69 |
(QUEUE_NET, EXCHANGE_GANETI, KEY_NET, 'update_net'), |
|
70 |
(QUEUE_PROGRESS, EXCHANGE_GANETI, KEY_PROGRESS, 'update_build_progress'), |
|
71 |
) |
|
72 |
|
|
73 |
|
|
74 |
## Extra for DEBUG: |
|
75 |
if DEBUG is True: |
|
76 |
# Debug queue, retrieves all messages |
|
77 |
QUEUE_DEBUG = "%s-debug" % prefix |
|
78 |
QUEUES += (QUEUE_DEBUG,) |
|
79 |
BINDINGS += ((QUEUE_DEBUG, EXCHANGE_GANETI, "#", "dummy_proc"),) |
|
80 |
|
|
81 |
|
|
82 |
def convert_queue_to_dead(queue): |
|
83 |
"""Convert the name of a queue to the corresponding dead-letter one""" |
|
84 |
return queue + "-dl" |
|
85 |
|
|
86 |
|
|
87 |
def convert_exchange_to_dead(exchange): |
|
88 |
"""Convert the name of an exchange to the corresponding dead-letter one""" |
|
89 |
return exchange + "-dl" |
Also available in: Unified diff