Revision e6209aa2 logic/dispatcher.py

b/logic/dispatcher.py
29 29
# policies, either expressed or implied, of GRNET S.A.
30 30

  
31 31

  
32
""" Message queue setup and dispatch
32
""" Message queue setup, dispatch and admin
33 33

  
34 34
This program sets up connections to the queues configured in settings.py
35 35
and implements the message wait and dispatch loops. Actual messages are
......
177 177
    parser = OptionParser()
178 178
    parser.add_option("-d", "--debug", action="store_true", default=False,
179 179
                      dest="debug", help="Enable debug mode")
180
    parser.add_option("-w", "--workers", default=2, dest="workers",
181
                      help="Number of workers to spawn", type="int")
182
    parser.add_option("-p", '--pid-file', dest="pid_file",
183
                      default=os.path.join(os.getcwd(), "dispatcher.pid"),
184
                      help="Save PID to file (default:%s)" %
185
                           os.path.join(os.getcwd(), "dispatcher.pid"))
180 186
    parser.add_option("--purge-queues", action="store_true",
181 187
                      default=False, dest="purge_queues",
182 188
                      help="Remove all declared queues (DANGEROUS!)")
......
184 190
                      default=False, dest="purge_exchanges",
185 191
                      help="Remove all exchanges. Implies deleting all queues \
186 192
                           first (DANGEROUS!)")
187
    parser.add_option("--drain-queue", dest="clean_queue",
188
                      help="Acks and removes all messages from a queue")
189
    parser.add_option("-w", "--workers", default=2, dest="workers",
190
                      help="Number of workers to spawn", type="int")
191
    parser.add_option("-p", '--pid-file', dest="pid_file",
192
                      default=os.path.join(os.getcwd(), "dispatcher.pid"),
193
                      help="Save PID to file (default:%s)" %
194
                           os.path.join(os.getcwd(), "dispatcher.pid"))
193
    parser.add_option("--drain-queue", dest="queue",
194
                      help="Strips a queue from all outstanding messages")
195 195

  
196 196
    return parser.parse_args(args)
197 197

  
......
260 260
    conn = get_connection()
261 261
    chan = conn.channel()
262 262

  
263
    # Register a temporary queue binding
264
    for binding in settings.BINDINGS:
265
        if binding[0] == queue:
266
            exch = binding[1]
267

  
268
    if not exch:
269
        print "Queue not bound to any exchange: %s" % queue
270
        return
271

  
272
    chan.queue_bind(queue=queue, exchange=exch,routing_key='#')
273
    tag = chan.basic_consume(queue=queue, callback=callbacks.dummy_proc)
274

  
275
    print "Queue draining about to start, hit Ctrl+c when done"
276
    time.sleep(2)
277
    print "Queue draining starting"
278

  
279
    signal(SIGTERM, _exit_handler)
280
    signal(SIGINT, _exit_handler)
281

  
282
    while True:
283
        chan.wait()
284
    chan.basic_cancel(tag)
263 285
    chan.connection.close()
264 286

  
265 287
def get_connection():
......
304 326
        return
305 327

  
306 328
    if opts.clean_queue:
307
        drain_queue(opts.clean_queue)
329
        drain_queue(opts.queue)
308 330
        return
309 331

  
310 332
    # Debug mode, process messages without spawning workers

Also available in: Unified diff