import sys
import logging
-from optparse import OptionParser
+import pika
+import json
-from carrot.connection import BrokerConnection
-from carrot.messaging import Consumer
-from carrot.messaging import Publisher
+from optparse import OptionParser
BROKER_HOST = 'localhost'
level=logging.DEBUG if DEBUG else logging.INFO)
logger = logging.getLogger('dispatcher')
- conn = BrokerConnection(hostname=opts.host, port=opts.port,
- userid=opts.user, password=opts.password,
- virtual_host=opts.vhost)
+ connection = pika.BlockingConnection(pika.ConnectionParameters(
+ host=opts.host, port=opts.port, virtual_host=opts.vhost,
+ credentials=pika.PlainCredentials(opts.user, opts.password)))
+ channel = connection.channel()
+ channel.exchange_declare(exchange=opts.exchange, type='topic', durable=True)
if opts.test:
- publisher = Publisher(connection=conn,
- exchange=opts.exchange, routing_key=opts.key,
- exchange_type="topic")
- publisher.send({"test": "0123456789"})
- publisher.close()
- conn.close()
+ channel.basic_publish(exchange=opts.exchange,
+ routing_key=opts.key,
+ body=json.dumps({"test": "0123456789"}))
+ connection.close()
sys.exit()
- consumer = Consumer(connection=conn, queue=opts.queue,
- exchange=opts.exchange, routing_key=opts.key,
- exchange_type="topic")
callback = None
if opts.callback:
cb_module = sys.modules[cb[0]]
callback = getattr(cb_module, cb[1])
- def process_message(message_data, message):
- logger.debug('%s', message_data)
+ def handle_delivery(channel, method_frame, header_frame, body):
+ logger.debug('Basic.Deliver %s delivery-tag %i: %s', header_frame.content_type,
+ method_frame.delivery_tag,
+ body)
if callback:
- callback(message_data)
- message.ack()
+ callback(json.loads(message_data))
+ channel.basic_ack(delivery_tag=method_frame.delivery_tag)
- consumer.register_callback(process_message)
+ channel.queue_declare(queue=opts.queue, durable=True,
+ exclusive=False, auto_delete=False)
+ channel.queue_bind(exchange=opts.exchange,
+ queue=opts.queue,
+ routing_key=opts.key)
+ channel.basic_consume(handle_delivery, queue=opts.queue)
try:
- consumer.wait()
+ channel.start_consuming()
except KeyboardInterrupt:
pass