Make dispatcher use pika.
[pithos] / pithos / tools / dispatcher.py
1 #!/usr/bin/env python
2
3 # Copyright 2011-2012 GRNET S.A. All rights reserved.
4 #
5 # Redistribution and use in source and binary forms, with or
6 # without modification, are permitted provided that the following
7 # conditions are met:
8 #
9 #   1. Redistributions of source code must retain the above
10 #      copyright notice, this list of conditions and the following
11 #      disclaimer.
12 #
13 #   2. Redistributions in binary form must reproduce the above
14 #      copyright notice, this list of conditions and the following
15 #      disclaimer in the documentation and/or other materials
16 #      provided with the distribution.
17 #
18 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
19 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
20 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
21 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
22 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
25 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
26 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
27 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
28 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29 # POSSIBILITY OF SUCH DAMAGE.
30 #
31 # The views and conclusions contained in the software and
32 # documentation are those of the authors and should not be
33 # interpreted as representing official policies, either expressed
34 # or implied, of GRNET S.A.
35
36 import sys
37 import logging
38
39 import pika
40 import json
41
42 from optparse import OptionParser
43
44
45 BROKER_HOST = 'localhost'
46 BROKER_PORT = 5672
47 BROKER_USER = 'guest'
48 BROKER_PASSWORD = 'guest'
49 BROKER_VHOST = '/'
50
51 CONSUMER_QUEUE = 'feed'
52 CONSUMER_EXCHANGE = 'sample'
53 CONSUMER_KEY = '#'
54
55 DEBUG = False
56
57
58 if __name__ == '__main__':
59     parser = OptionParser()
60     parser.add_option('-v', '--verbose', action='store_true', default=False,
61                       dest='verbose', help='Enable verbose logging')
62     parser.add_option('--host', default=BROKER_HOST, dest='host',
63                       help='RabbitMQ host (default: %s)' % BROKER_HOST)
64     parser.add_option('--port', default=BROKER_PORT, dest='port',
65                       help='RabbitMQ port (default: %s)' % BROKER_PORT, type='int')
66     parser.add_option('--user', default=BROKER_USER, dest='user',
67                       help='RabbitMQ user (default: %s)' % BROKER_USER)
68     parser.add_option('--password', default=BROKER_PASSWORD, dest='password',
69                       help='RabbitMQ password (default: %s)' % BROKER_PASSWORD)
70     parser.add_option('--vhost', default=BROKER_VHOST, dest='vhost',
71                       help='RabbitMQ vhost (default: %s)' % BROKER_VHOST)
72     parser.add_option('--queue', default=CONSUMER_QUEUE, dest='queue',
73                       help='RabbitMQ queue (default: %s)' % CONSUMER_QUEUE)
74     parser.add_option('--exchange', default=CONSUMER_EXCHANGE, dest='exchange',
75                       help='RabbitMQ exchange (default: %s)' % CONSUMER_EXCHANGE)
76     parser.add_option('--key', default=CONSUMER_KEY, dest='key',
77                       help='RabbitMQ key (default: %s)' % CONSUMER_KEY)
78     parser.add_option('--callback', default=None, dest='callback',
79                       help='Callback function to consume messages')
80     parser.add_option('--test', action='store_true', default=False,
81                       dest='test', help='Produce a dummy message for testing')
82     opts, args = parser.parse_args()
83     
84     if opts.verbose:
85         DEBUG = True
86     logging.basicConfig(format='%(asctime)s [%(levelname)s] %(name)s %(message)s',
87                         datefmt='%Y-%m-%d %H:%M:%S',
88                         level=logging.DEBUG if DEBUG else logging.INFO)
89     logger = logging.getLogger('dispatcher')
90     
91     connection = pika.BlockingConnection(pika.ConnectionParameters(
92                     host=opts.host, port=opts.port, virtual_host=opts.vhost,
93                     credentials=pika.PlainCredentials(opts.user, opts.password)))
94     channel = connection.channel()
95     channel.exchange_declare(exchange=opts.exchange, type='topic', durable=True)
96     if opts.test:
97         channel.basic_publish(exchange=opts.exchange,
98                               routing_key=opts.key,
99                               body=json.dumps({"test": "0123456789"}))
100         connection.close()
101         sys.exit()
102     
103     callback = None
104     if opts.callback:
105         cb = opts.callback.rsplit('.', 1)
106         if len(cb) == 2:
107             __import__(cb[0])
108             cb_module = sys.modules[cb[0]]
109             callback = getattr(cb_module, cb[1])
110     
111     def handle_delivery(channel, method_frame, header_frame, body):
112         logger.debug('Basic.Deliver %s delivery-tag %i: %s', header_frame.content_type,
113                                                              method_frame.delivery_tag,
114                                                              body)
115         if callback:
116             callback(json.loads(message_data))
117         channel.basic_ack(delivery_tag=method_frame.delivery_tag)
118     
119     channel.queue_declare(queue=opts.queue, durable=True,
120                           exclusive=False, auto_delete=False)
121     channel.queue_bind(exchange=opts.exchange,
122                        queue=opts.queue,
123                        routing_key=opts.key)
124     channel.basic_consume(handle_delivery, queue=opts.queue)
125     try:
126         channel.start_consuming()
127     except KeyboardInterrupt:
128         pass
129