Revision 979482ce
b/logic/dispatcher.py | ||
---|---|---|
177 | 177 |
parser = OptionParser() |
178 | 178 |
parser.add_option("-d", "--debug", action="store_true", default=False, |
179 | 179 |
dest="debug", help="Enable debug mode") |
180 |
parser.add_option("-c", "--cleanup-queues", action="store_true",
|
|
181 |
default=False, dest="cleanup_queues",
|
|
180 |
parser.add_option("--purge-queues", action="store_true",
|
|
181 |
default=False, dest="purge_queues",
|
|
182 | 182 |
help="Remove all declared queues (DANGEROUS!)") |
183 |
parser.add_option("--purge-exchanges", action="store_true", |
|
184 |
default=False, dest="purge_exchanges", |
|
185 |
help="Remove all exchanges. Implies deleting all queues \ |
|
186 |
first (DANGEROUS!)") |
|
187 |
parser.add_option("--drain-queue", dest="clean_queue", |
|
188 |
help="Acks and removes all messages from a queue") |
|
183 | 189 |
parser.add_option("-w", "--workers", default=2, dest="workers", |
184 | 190 |
help="Number of workers to spawn", type="int") |
185 | 191 |
parser.add_option("-p", '--pid-file', dest="pid_file", |
... | ... | |
190 | 196 |
return parser.parse_args(args) |
191 | 197 |
|
192 | 198 |
|
193 |
def cleanup_queues() : |
|
194 |
"""Delete declared queues from RabbitMQ. Use with care!""" |
|
195 |
conn = amqp.Connection( host=settings.RABBIT_HOST, |
|
196 |
userid=settings.RABBIT_USERNAME, |
|
197 |
password=settings.RABBIT_PASSWORD, |
|
198 |
virtual_host=settings.RABBIT_VHOST) |
|
199 |
def purge_queues() : |
|
200 |
""" |
|
201 |
Delete declared queues from RabbitMQ. Use with care! |
|
202 |
""" |
|
203 |
conn = get_connection() |
|
199 | 204 |
chan = conn.channel() |
200 | 205 |
|
201 | 206 |
print "Queues to be deleted: ", settings.QUEUES |
202 |
print "Exchnages to be deleted: ", settings.EXCHANGES |
|
203 |
ans = raw_input("Are you sure (N/y):") |
|
204 | 207 |
|
205 |
if not ans: |
|
206 |
return |
|
207 |
if ans not in ['Y', 'y']: |
|
208 |
if not get_user_confirmation(): |
|
208 | 209 |
return |
209 | 210 |
|
210 |
#for exchange in settings.EXCHANGES: |
|
211 |
# try: |
|
212 |
# chan.exchange_delete(exchange=exchange) |
|
213 |
# except amqp.exceptions.AMQPChannelException as e: |
|
214 |
# print e.amqp_reply_code, " ", e.amqp_reply_text |
|
215 |
|
|
216 | 211 |
for queue in settings.QUEUES: |
217 | 212 |
try: |
218 | 213 |
chan.queue_delete(queue=queue) |
214 |
print "Deleting queue %s" % queue |
|
215 |
except amqp.exceptions.AMQPChannelException as e: |
|
216 |
print e.amqp_reply_code, " ", e.amqp_reply_text |
|
217 |
chan = conn.channel() |
|
218 |
|
|
219 |
chan.connection.close() |
|
220 |
|
|
221 |
|
|
222 |
def purge_exchanges(): |
|
223 |
""" |
|
224 |
Delete declared exchanges from RabbitMQ, after removing all queues first |
|
225 |
""" |
|
226 |
purge_queues() |
|
227 |
|
|
228 |
conn = get_connection() |
|
229 |
chan = conn.channel() |
|
230 |
|
|
231 |
print "Exchnages to be deleted: ", settings.EXCHANGES |
|
232 |
|
|
233 |
if not get_user_confirmation(): |
|
234 |
return |
|
235 |
|
|
236 |
for exchange in settings.EXCHANGES: |
|
237 |
try: |
|
238 |
chan.exchange_delete(exchange=exchange) |
|
219 | 239 |
except amqp.exceptions.AMQPChannelException as e: |
220 | 240 |
print e.amqp_reply_code, " ", e.amqp_reply_text |
221 |
chan.close() |
|
241 |
|
|
222 | 242 |
chan.connection.close() |
223 | 243 |
|
224 | 244 |
|
245 |
def drain_queue(queue): |
|
246 |
""" |
|
247 |
Strip a (declared) queue from all outstanding messages |
|
248 |
""" |
|
249 |
if not queue: |
|
250 |
return |
|
251 |
|
|
252 |
if not queue in settings.QUEUES: |
|
253 |
print "Queue %s not configured" % queue |
|
254 |
return |
|
255 |
|
|
256 |
print "Queue to be drained: %s" % queue |
|
257 |
|
|
258 |
if not get_user_confirmation(): |
|
259 |
return |
|
260 |
conn = get_connection() |
|
261 |
chan = conn.channel() |
|
262 |
|
|
263 |
chan.connection.close() |
|
264 |
|
|
265 |
def get_connection(): |
|
266 |
conn = amqp.Connection( host=settings.RABBIT_HOST, |
|
267 |
userid=settings.RABBIT_USERNAME, |
|
268 |
password=settings.RABBIT_PASSWORD, |
|
269 |
virtual_host=settings.RABBIT_VHOST) |
|
270 |
return conn |
|
271 |
|
|
272 |
def get_user_confirmation(): |
|
273 |
ans = raw_input("Are you sure (N/y):") |
|
274 |
|
|
275 |
if not ans: |
|
276 |
return False |
|
277 |
if ans not in ['Y', 'y']: |
|
278 |
return False |
|
279 |
return True |
|
280 |
|
|
281 |
|
|
225 | 282 |
def debug_mode(): |
226 | 283 |
disp = Dispatcher(debug = True) |
227 | 284 |
signal(SIGINT, _exit_handler) |
... | ... | |
237 | 294 |
logger = log.get_logger("synnefo.dispatcher") |
238 | 295 |
|
239 | 296 |
# Special case for the clean up queues action |
240 |
if opts.cleanup_queues: |
|
241 |
cleanup_queues() |
|
297 |
if opts.purge_queues: |
|
298 |
purge_queues() |
|
299 |
return |
|
300 |
|
|
301 |
# Special case for the clean up exch action |
|
302 |
if opts.purge_exchanges: |
|
303 |
purge_exchanges() |
|
304 |
return |
|
305 |
|
|
306 |
if opts.clean_queue: |
|
307 |
drain_queue(opts.clean_queue) |
|
242 | 308 |
return |
243 | 309 |
|
244 | 310 |
# Debug mode, process messages without spawning workers |
Also available in: Unified diff