from optparse import OptionParser
-from carrot.connection import BrokerConnection
-from carrot.messaging import Consumer
-from carrot.messaging import Publisher
+try:
+ from carrot.connection import BrokerConnection
+ from carrot.messaging import Consumer
+ from carrot.messaging import Publisher
+except ImportError:
+ sys.stderr.write("Dispatcher requires 'carrot' python library to " \
+ "be installed\n")
+ sys.exit(1)
+
BROKER_HOST = 'localhost'
DEBUG = False
-if __name__ == '__main__':
+def main():
parser = OptionParser()
parser.add_option('-v', '--verbose', action='store_true', default=False,
dest='verbose', help='Enable verbose logging')
parser.add_option('--test', action='store_true', default=False,
dest='test', help='Produce a dummy message for testing')
opts, args = parser.parse_args()
-
+
if opts.verbose:
DEBUG = True
logging.basicConfig(format='%(asctime)s [%(levelname)s] %(name)s %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
level=logging.DEBUG if DEBUG else logging.INFO)
logger = logging.getLogger('dispatcher')
-
+
conn = BrokerConnection(hostname=opts.host, port=opts.port,
userid=opts.user, password=opts.password,
virtual_host=opts.vhost)
consumer = Consumer(connection=conn, queue=opts.queue,
exchange=opts.exchange, routing_key=opts.key,
exchange_type="topic")
-
+
callback = None
if opts.callback:
cb = opts.callback.rsplit('.', 1)
__import__(cb[0])
cb_module = sys.modules[cb[0]]
callback = getattr(cb_module, cb[1])
-
+
def process_message(message_data, message):
logger.debug('%s', message_data)
if callback:
callback(message_data)
message.ack()
-
+
consumer.register_callback(process_message)
try:
consumer.wait()
except KeyboardInterrupt:
pass
+if __name__ == '__main__':
+ main()
+