3 # Copyright 2011-2012 GRNET S.A. All rights reserved.
5 # Redistribution and use in source and binary forms, with or
6 # without modification, are permitted provided that the following
9 # 1. Redistributions of source code must retain the above
10 # copyright notice, this list of conditions and the following
13 # 2. Redistributions in binary form must reproduce the above
14 # copyright notice, this list of conditions and the following
15 # disclaimer in the documentation and/or other materials
16 # provided with the distribution.
18 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
19 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
20 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
21 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
22 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
25 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
26 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
27 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
28 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29 # POSSIBILITY OF SUCH DAMAGE.
31 # The views and conclusions contained in the software and
32 # documentation are those of the authors and should not be
33 # interpreted as representing official policies, either expressed
34 # or implied, of GRNET S.A.
39 from optparse import OptionParser
42 from carrot.connection import BrokerConnection
43 from carrot.messaging import Consumer
44 from carrot.messaging import Publisher
46 sys.stderr.write("Dispatcher requires 'carrot' python library to " \
52 BROKER_HOST = 'localhost'
55 BROKER_PASSWORD = 'guest'
58 CONSUMER_QUEUE = 'feed'
59 CONSUMER_EXCHANGE = 'sample'
66 parser = OptionParser()
67 parser.add_option('-v', '--verbose', action='store_true', default=False,
68 dest='verbose', help='Enable verbose logging')
69 parser.add_option('--host', default=BROKER_HOST, dest='host',
70 help='RabbitMQ host (default: %s)' % BROKER_HOST)
71 parser.add_option('--port', default=BROKER_PORT, dest='port',
72 help='RabbitMQ port (default: %s)' % BROKER_PORT, type='int')
73 parser.add_option('--user', default=BROKER_USER, dest='user',
74 help='RabbitMQ user (default: %s)' % BROKER_USER)
75 parser.add_option('--password', default=BROKER_PASSWORD, dest='password',
76 help='RabbitMQ password (default: %s)' % BROKER_PASSWORD)
77 parser.add_option('--vhost', default=BROKER_VHOST, dest='vhost',
78 help='RabbitMQ vhost (default: %s)' % BROKER_VHOST)
79 parser.add_option('--queue', default=CONSUMER_QUEUE, dest='queue',
80 help='RabbitMQ queue (default: %s)' % CONSUMER_QUEUE)
81 parser.add_option('--exchange', default=CONSUMER_EXCHANGE, dest='exchange',
82 help='RabbitMQ exchange (default: %s)' % CONSUMER_EXCHANGE)
83 parser.add_option('--key', default=CONSUMER_KEY, dest='key',
84 help='RabbitMQ key (default: %s)' % CONSUMER_KEY)
85 parser.add_option('--callback', default=None, dest='callback',
86 help='Callback function to consume messages')
87 parser.add_option('--test', action='store_true', default=False,
88 dest='test', help='Produce a dummy message for testing')
89 opts, args = parser.parse_args()
93 logging.basicConfig(format='%(asctime)s [%(levelname)s] %(name)s %(message)s',
94 datefmt='%Y-%m-%d %H:%M:%S',
95 level=logging.DEBUG if DEBUG else logging.INFO)
96 logger = logging.getLogger('dispatcher')
98 conn = BrokerConnection(hostname=opts.host, port=opts.port,
99 userid=opts.user, password=opts.password,
100 virtual_host=opts.vhost)
102 publisher = Publisher(connection=conn,
103 exchange=opts.exchange, routing_key=opts.key,
104 exchange_type="topic")
105 publisher.send({"test": "0123456789"})
109 consumer = Consumer(connection=conn, queue=opts.queue,
110 exchange=opts.exchange, routing_key=opts.key,
111 exchange_type="topic")
115 cb = opts.callback.rsplit('.', 1)
118 cb_module = sys.modules[cb[0]]
119 callback = getattr(cb_module, cb[1])
121 def process_message(message_data, message):
122 logger.debug('%s', message_data)
124 callback(message_data)
127 consumer.register_callback(process_message)
130 except KeyboardInterrupt:
133 if __name__ == '__main__':