The dispatcher is a python executable which monitors a RabbitMQ queue and calls a...
[pithos] / pithos / tools / dispatcher.py
diff --git a/pithos/tools/dispatcher.py b/pithos/tools/dispatcher.py
new file mode 100755 (executable)
index 0000000..e66b629
--- /dev/null
@@ -0,0 +1,126 @@
+#!/usr/bin/env python
+
+# Copyright 2011 GRNET S.A. All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or
+# without modification, are permitted provided that the following
+# conditions are met:
+#
+#   1. Redistributions of source code must retain the above
+#      copyright notice, this list of conditions and the following
+#      disclaimer.
+#
+#   2. Redistributions in binary form must reproduce the above
+#      copyright notice, this list of conditions and the following
+#      disclaimer in the documentation and/or other materials
+#      provided with the distribution.
+#
+# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
+# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
+# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+# POSSIBILITY OF SUCH DAMAGE.
+#
+# The views and conclusions contained in the software and
+# documentation are those of the authors and should not be
+# interpreted as representing official policies, either expressed
+# or implied, of GRNET S.A.
+
+import sys
+import logging
+
+from optparse import OptionParser
+
+from carrot.connection import BrokerConnection
+from carrot.messaging import Consumer
+from carrot.messaging import Publisher
+
+
+BROKER_HOST = 'localhost'
+BROKER_PORT = 5672
+BROKER_USER = 'guest'
+BROKER_PASSWORD = 'guest'
+BROKER_VHOST = '/'
+
+CONSUMER_QUEUE = 'feed'
+CONSUMER_EXCHANGE = 'sample'
+CONSUMER_KEY = '#'
+
+DEBUG = False
+
+
+if __name__ == '__main__':
+    parser = OptionParser()
+    parser.add_option('-v', '--verbose', action='store_true', default=False,
+                      dest='verbose', help='Enable verbose logging')
+    parser.add_option('--host', default=BROKER_HOST, dest='host',
+                      help='RabbitMQ host (default: %s)' % BROKER_HOST)
+    parser.add_option('--port', default=BROKER_PORT, dest='port',
+                      help='RabbitMQ port (default: %s)' % BROKER_PORT, type='int')
+    parser.add_option('--user', default=BROKER_USER, dest='user',
+                      help='RabbitMQ user (default: %s)' % BROKER_USER)
+    parser.add_option('--password', default=BROKER_PASSWORD, dest='password',
+                      help='RabbitMQ password (default: %s)' % BROKER_PASSWORD)
+    parser.add_option('--vhost', default=BROKER_VHOST, dest='vhost',
+                      help='RabbitMQ vhost (default: %s)' % BROKER_VHOST)
+    parser.add_option('--queue', default=CONSUMER_QUEUE, dest='queue',
+                      help='RabbitMQ queue (default: %s)' % CONSUMER_QUEUE)
+    parser.add_option('--exchange', default=CONSUMER_EXCHANGE, dest='exchange',
+                      help='RabbitMQ exchange (default: %s)' % CONSUMER_EXCHANGE)
+    parser.add_option('--key', default=CONSUMER_KEY, dest='key',
+                      help='RabbitMQ key (default: %s)' % CONSUMER_KEY)
+    parser.add_option('--callback', default=None, dest='callback',
+                      help='Callback function to consume messages')
+    parser.add_option('--test', action='store_true', default=False,
+                      dest='test', help='Produce a dummy message for testing')
+    opts, args = parser.parse_args()
+    
+    if opts.verbose:
+        DEBUG = True
+    logging.basicConfig(format='%(asctime)s [%(levelname)s] %(name)s %(message)s',
+                        datefmt='%Y-%m-%d %H:%M:%S',
+                        level=logging.DEBUG if DEBUG else logging.INFO)
+    logger = logging.getLogger('dispatcher')
+    
+    conn = BrokerConnection(hostname=opts.host, port=opts.port,
+                            userid=opts.user, password=opts.password,
+                            virtual_host=opts.vhost)
+    if opts.test:
+        publisher = Publisher(connection=conn,
+                              exchange=opts.exchange, routing_key=opts.key,
+                              exchange_type="topic")
+        publisher.send({"test": "0123456789"})
+        publisher.close()
+        conn.close()
+        sys.exit()
+    consumer = Consumer(connection=conn, queue=opts.queue,
+                        exchange=opts.exchange, routing_key=opts.key,
+                        exchange_type="topic")
+    
+    callback = None
+    if opts.callback:
+        cb = opts.callback.rsplit('.', 1)
+        if len(cb) == 2:
+            __import__(cb[0])
+            cb_module = sys.modules[cb[0]]
+            callback = getattr(cb_module, cb[1])
+    
+    def process_message(message_data, message):
+        logger.debug('%s', message_data)
+        if callback:
+            callback(message_data)
+        message.ack()
+    
+    consumer.register_callback(process_message)
+    try:
+        consumer.wait()
+    except KeyboardInterrupt:
+        pass
+