Make dispatcher use pika.
authorAntony Chazapis <chazapis@gmail.com>
Fri, 27 Jan 2012 15:17:03 +0000 (17:17 +0200)
committerAntony Chazapis <chazapis@gmail.com>
Fri, 27 Jan 2012 15:17:03 +0000 (17:17 +0200)
pithos/tools/dispatcher.py

index 8f6ed4e..d3539a9 100755 (executable)
 import sys
 import logging
 
-from optparse import OptionParser
+import pika
+import json
 
-from carrot.connection import BrokerConnection
-from carrot.messaging import Consumer
-from carrot.messaging import Publisher
+from optparse import OptionParser
 
 
 BROKER_HOST = 'localhost'
@@ -89,20 +88,17 @@ if __name__ == '__main__':
                         level=logging.DEBUG if DEBUG else logging.INFO)
     logger = logging.getLogger('dispatcher')
     
-    conn = BrokerConnection(hostname=opts.host, port=opts.port,
-                            userid=opts.user, password=opts.password,
-                            virtual_host=opts.vhost)
+    connection = pika.BlockingConnection(pika.ConnectionParameters(
+                    host=opts.host, port=opts.port, virtual_host=opts.vhost,
+                    credentials=pika.PlainCredentials(opts.user, opts.password)))
+    channel = connection.channel()
+    channel.exchange_declare(exchange=opts.exchange, type='topic', durable=True)
     if opts.test:
-        publisher = Publisher(connection=conn,
-                              exchange=opts.exchange, routing_key=opts.key,
-                              exchange_type="topic")
-        publisher.send({"test": "0123456789"})
-        publisher.close()
-        conn.close()
+        channel.basic_publish(exchange=opts.exchange,
+                              routing_key=opts.key,
+                              body=json.dumps({"test": "0123456789"}))
+        connection.close()
         sys.exit()
-    consumer = Consumer(connection=conn, queue=opts.queue,
-                        exchange=opts.exchange, routing_key=opts.key,
-                        exchange_type="topic")
     
     callback = None
     if opts.callback:
@@ -112,15 +108,22 @@ if __name__ == '__main__':
             cb_module = sys.modules[cb[0]]
             callback = getattr(cb_module, cb[1])
     
-    def process_message(message_data, message):
-        logger.debug('%s', message_data)
+    def handle_delivery(channel, method_frame, header_frame, body):
+        logger.debug('Basic.Deliver %s delivery-tag %i: %s', header_frame.content_type,
+                                                             method_frame.delivery_tag,
+                                                             body)
         if callback:
-            callback(message_data)
-        message.ack()
+            callback(json.loads(message_data))
+        channel.basic_ack(delivery_tag=method_frame.delivery_tag)
     
-    consumer.register_callback(process_message)
+    channel.queue_declare(queue=opts.queue, durable=True,
+                          exclusive=False, auto_delete=False)
+    channel.queue_bind(exchange=opts.exchange,
+                       queue=opts.queue,
+                       routing_key=opts.key)
+    channel.basic_consume(handle_delivery, queue=opts.queue)
     try:
-        consumer.wait()
+        channel.start_consuming()
     except KeyboardInterrupt:
         pass