Revision 979482ce

b/logic/dispatcher.py
177 177
    parser = OptionParser()
178 178
    parser.add_option("-d", "--debug", action="store_true", default=False,
179 179
                      dest="debug", help="Enable debug mode")
180
    parser.add_option("-c", "--cleanup-queues", action="store_true",
181
                      default=False, dest="cleanup_queues",
180
    parser.add_option("--purge-queues", action="store_true",
181
                      default=False, dest="purge_queues",
182 182
                      help="Remove all declared queues (DANGEROUS!)")
183
    parser.add_option("--purge-exchanges", action="store_true",
184
                      default=False, dest="purge_exchanges",
185
                      help="Remove all exchanges. Implies deleting all queues \
186
                           first (DANGEROUS!)")
187
    parser.add_option("--drain-queue", dest="clean_queue",
188
                      help="Acks and removes all messages from a queue")
183 189
    parser.add_option("-w", "--workers", default=2, dest="workers",
184 190
                      help="Number of workers to spawn", type="int")
185 191
    parser.add_option("-p", '--pid-file', dest="pid_file",
......
190 196
    return parser.parse_args(args)
191 197

  
192 198

  
193
def cleanup_queues() :
194
    """Delete declared queues from RabbitMQ. Use with care!"""
195
    conn = amqp.Connection( host=settings.RABBIT_HOST,
196
                            userid=settings.RABBIT_USERNAME,
197
                            password=settings.RABBIT_PASSWORD,
198
                            virtual_host=settings.RABBIT_VHOST)
199
def purge_queues() :
200
    """
201
        Delete declared queues from RabbitMQ. Use with care!
202
    """
203
    conn = get_connection()
199 204
    chan = conn.channel()
200 205

  
201 206
    print "Queues to be deleted: ",  settings.QUEUES
202
    print "Exchnages to be deleted: ", settings.EXCHANGES
203
    ans = raw_input("Are you sure (N/y):")
204 207

  
205
    if not ans:
206
        return
207
    if ans not in ['Y', 'y']:
208
    if not get_user_confirmation():
208 209
        return
209 210

  
210
    #for exchange in settings.EXCHANGES:
211
    #    try:
212
    #        chan.exchange_delete(exchange=exchange)
213
    #    except amqp.exceptions.AMQPChannelException as e:
214
    #        print e.amqp_reply_code, " ", e.amqp_reply_text
215

  
216 211
    for queue in settings.QUEUES:
217 212
        try:
218 213
            chan.queue_delete(queue=queue)
214
            print "Deleting queue %s" % queue
215
        except amqp.exceptions.AMQPChannelException as e:
216
            print e.amqp_reply_code, " ", e.amqp_reply_text
217
            chan = conn.channel()
218

  
219
    chan.connection.close()
220

  
221

  
222
def purge_exchanges():
223
    """
224
        Delete declared exchanges from RabbitMQ, after removing all queues first
225
    """
226
    purge_queues()
227

  
228
    conn = get_connection()
229
    chan = conn.channel()
230

  
231
    print "Exchnages to be deleted: ", settings.EXCHANGES
232

  
233
    if not get_user_confirmation():
234
        return
235

  
236
    for exchange in settings.EXCHANGES:
237
        try:
238
            chan.exchange_delete(exchange=exchange)
219 239
        except amqp.exceptions.AMQPChannelException as e:
220 240
            print e.amqp_reply_code, " ", e.amqp_reply_text
221
    chan.close()
241

  
222 242
    chan.connection.close()
223 243

  
224 244

  
245
def drain_queue(queue):
246
    """
247
        Strip a (declared) queue from all outstanding messages
248
    """
249
    if not queue:
250
        return
251

  
252
    if not queue in settings.QUEUES:
253
        print "Queue %s not configured" % queue
254
        return
255

  
256
    print "Queue to be drained: %s" % queue
257

  
258
    if not get_user_confirmation():
259
        return
260
    conn = get_connection()
261
    chan = conn.channel()
262

  
263
    chan.connection.close()
264

  
265
def get_connection():
266
    conn = amqp.Connection( host=settings.RABBIT_HOST,
267
                        userid=settings.RABBIT_USERNAME,
268
                        password=settings.RABBIT_PASSWORD,
269
                        virtual_host=settings.RABBIT_VHOST)
270
    return conn
271

  
272
def get_user_confirmation():
273
    ans = raw_input("Are you sure (N/y):")
274

  
275
    if not ans:
276
        return False
277
    if ans not in ['Y', 'y']:
278
        return False
279
    return True
280

  
281

  
225 282
def debug_mode():
226 283
    disp = Dispatcher(debug = True)
227 284
    signal(SIGINT, _exit_handler)
......
237 294
    logger = log.get_logger("synnefo.dispatcher")
238 295

  
239 296
    # Special case for the clean up queues action
240
    if opts.cleanup_queues:
241
        cleanup_queues()
297
    if opts.purge_queues:
298
        purge_queues()
299
        return
300

  
301
    # Special case for the clean up exch action
302
    if opts.purge_exchanges:
303
        purge_exchanges()
304
        return
305

  
306
    if opts.clean_queue:
307
        drain_queue(opts.clean_queue)
242 308
        return
243 309

  
244 310
    # Debug mode, process messages without spawning workers

Also available in: Unified diff