Revision e6209aa2
b/logic/dispatcher.py | ||
---|---|---|
29 | 29 |
# policies, either expressed or implied, of GRNET S.A. |
30 | 30 |
|
31 | 31 |
|
32 |
""" Message queue setup and dispatch
|
|
32 |
""" Message queue setup, dispatch and admin
|
|
33 | 33 |
|
34 | 34 |
This program sets up connections to the queues configured in settings.py |
35 | 35 |
and implements the message wait and dispatch loops. Actual messages are |
... | ... | |
177 | 177 |
parser = OptionParser() |
178 | 178 |
parser.add_option("-d", "--debug", action="store_true", default=False, |
179 | 179 |
dest="debug", help="Enable debug mode") |
180 |
parser.add_option("-w", "--workers", default=2, dest="workers", |
|
181 |
help="Number of workers to spawn", type="int") |
|
182 |
parser.add_option("-p", '--pid-file', dest="pid_file", |
|
183 |
default=os.path.join(os.getcwd(), "dispatcher.pid"), |
|
184 |
help="Save PID to file (default:%s)" % |
|
185 |
os.path.join(os.getcwd(), "dispatcher.pid")) |
|
180 | 186 |
parser.add_option("--purge-queues", action="store_true", |
181 | 187 |
default=False, dest="purge_queues", |
182 | 188 |
help="Remove all declared queues (DANGEROUS!)") |
... | ... | |
184 | 190 |
default=False, dest="purge_exchanges", |
185 | 191 |
help="Remove all exchanges. Implies deleting all queues \ |
186 | 192 |
first (DANGEROUS!)") |
187 |
parser.add_option("--drain-queue", dest="clean_queue", |
|
188 |
help="Acks and removes all messages from a queue") |
|
189 |
parser.add_option("-w", "--workers", default=2, dest="workers", |
|
190 |
help="Number of workers to spawn", type="int") |
|
191 |
parser.add_option("-p", '--pid-file', dest="pid_file", |
|
192 |
default=os.path.join(os.getcwd(), "dispatcher.pid"), |
|
193 |
help="Save PID to file (default:%s)" % |
|
194 |
os.path.join(os.getcwd(), "dispatcher.pid")) |
|
193 |
parser.add_option("--drain-queue", dest="queue", |
|
194 |
help="Strips a queue from all outstanding messages") |
|
195 | 195 |
|
196 | 196 |
return parser.parse_args(args) |
197 | 197 |
|
... | ... | |
260 | 260 |
conn = get_connection() |
261 | 261 |
chan = conn.channel() |
262 | 262 |
|
263 |
# Register a temporary queue binding |
|
264 |
for binding in settings.BINDINGS: |
|
265 |
if binding[0] == queue: |
|
266 |
exch = binding[1] |
|
267 |
|
|
268 |
if not exch: |
|
269 |
print "Queue not bound to any exchange: %s" % queue |
|
270 |
return |
|
271 |
|
|
272 |
chan.queue_bind(queue=queue, exchange=exch,routing_key='#') |
|
273 |
tag = chan.basic_consume(queue=queue, callback=callbacks.dummy_proc) |
|
274 |
|
|
275 |
print "Queue draining about to start, hit Ctrl+c when done" |
|
276 |
time.sleep(2) |
|
277 |
print "Queue draining starting" |
|
278 |
|
|
279 |
signal(SIGTERM, _exit_handler) |
|
280 |
signal(SIGINT, _exit_handler) |
|
281 |
|
|
282 |
while True: |
|
283 |
chan.wait() |
|
284 |
chan.basic_cancel(tag) |
|
263 | 285 |
chan.connection.close() |
264 | 286 |
|
265 | 287 |
def get_connection(): |
... | ... | |
304 | 326 |
return |
305 | 327 |
|
306 | 328 |
if opts.clean_queue: |
307 |
drain_queue(opts.clean_queue)
|
|
329 |
drain_queue(opts.queue) |
|
308 | 330 |
return |
309 | 331 |
|
310 | 332 |
# Debug mode, process messages without spawning workers |
Also available in: Unified diff