Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-tools / pithos / tools / dispatcher.py @ 9e826a59

History | View | Annotate | Download (5.2 kB)

1
#!/usr/bin/env python
2

    
3
# Copyright 2011 GRNET S.A. All rights reserved.
4
#
5
# Redistribution and use in source and binary forms, with or
6
# without modification, are permitted provided that the following
7
# conditions are met:
8
#
9
#   1. Redistributions of source code must retain the above
10
#      copyright notice, this list of conditions and the following
11
#      disclaimer.
12
#
13
#   2. Redistributions in binary form must reproduce the above
14
#      copyright notice, this list of conditions and the following
15
#      disclaimer in the documentation and/or other materials
16
#      provided with the distribution.
17
#
18
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
19
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
20
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
21
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
22
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
25
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
26
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
27
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
28
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29
# POSSIBILITY OF SUCH DAMAGE.
30
#
31
# The views and conclusions contained in the software and
32
# documentation are those of the authors and should not be
33
# interpreted as representing official policies, either expressed
34
# or implied, of GRNET S.A.
35

    
36
import sys
37
import logging
38

    
39
from optparse import OptionParser
40

    
41
try:
42
    from carrot.connection import BrokerConnection
43
    from carrot.messaging import Consumer
44
    from carrot.messaging import Publisher
45
except ImportError:
46
    sys.stderr.write("Dispatcher requires 'carrot' python library to " \
47
                     "be installed\n")
48
    sys.exit(1)
49

    
50

    
51

    
52
BROKER_HOST = 'localhost'
53
BROKER_PORT = 5672
54
BROKER_USER = 'guest'
55
BROKER_PASSWORD = 'guest'
56
BROKER_VHOST = '/'
57

    
58
CONSUMER_QUEUE = 'feed'
59
CONSUMER_EXCHANGE = 'sample'
60
CONSUMER_KEY = '#'
61

    
62
DEBUG = False
63

    
64

    
65
def main():
66
    parser = OptionParser()
67
    parser.add_option('-v', '--verbose', action='store_true', default=False,
68
                      dest='verbose', help='Enable verbose logging')
69
    parser.add_option('--host', default=BROKER_HOST, dest='host',
70
                      help='RabbitMQ host (default: %s)' % BROKER_HOST)
71
    parser.add_option('--port', default=BROKER_PORT, dest='port',
72
                      help='RabbitMQ port (default: %s)' % BROKER_PORT, type='int')
73
    parser.add_option('--user', default=BROKER_USER, dest='user',
74
                      help='RabbitMQ user (default: %s)' % BROKER_USER)
75
    parser.add_option('--password', default=BROKER_PASSWORD, dest='password',
76
                      help='RabbitMQ password (default: %s)' % BROKER_PASSWORD)
77
    parser.add_option('--vhost', default=BROKER_VHOST, dest='vhost',
78
                      help='RabbitMQ vhost (default: %s)' % BROKER_VHOST)
79
    parser.add_option('--queue', default=CONSUMER_QUEUE, dest='queue',
80
                      help='RabbitMQ queue (default: %s)' % CONSUMER_QUEUE)
81
    parser.add_option('--exchange', default=CONSUMER_EXCHANGE, dest='exchange',
82
                      help='RabbitMQ exchange (default: %s)' % CONSUMER_EXCHANGE)
83
    parser.add_option('--key', default=CONSUMER_KEY, dest='key',
84
                      help='RabbitMQ key (default: %s)' % CONSUMER_KEY)
85
    parser.add_option('--callback', default=None, dest='callback',
86
                      help='Callback function to consume messages')
87
    parser.add_option('--test', action='store_true', default=False,
88
                      dest='test', help='Produce a dummy message for testing')
89
    opts, args = parser.parse_args()
90

    
91
    if opts.verbose:
92
        DEBUG = True
93
    logging.basicConfig(format='%(asctime)s [%(levelname)s] %(name)s %(message)s',
94
                        datefmt='%Y-%m-%d %H:%M:%S',
95
                        level=logging.DEBUG if DEBUG else logging.INFO)
96
    logger = logging.getLogger('dispatcher')
97

    
98
    conn = BrokerConnection(hostname=opts.host, port=opts.port,
99
                            userid=opts.user, password=opts.password,
100
                            virtual_host=opts.vhost)
101
    if opts.test:
102
        publisher = Publisher(connection=conn,
103
                              exchange=opts.exchange, routing_key=opts.key,
104
                              exchange_type="topic")
105
        publisher.send({"test": "0123456789"})
106
        publisher.close()
107
        conn.close()
108
        sys.exit()
109
    consumer = Consumer(connection=conn, queue=opts.queue,
110
                        exchange=opts.exchange, routing_key=opts.key,
111
                        exchange_type="topic")
112

    
113
    callback = None
114
    if opts.callback:
115
        cb = opts.callback.rsplit('.', 1)
116
        if len(cb) == 2:
117
            __import__(cb[0])
118
            cb_module = sys.modules[cb[0]]
119
            callback = getattr(cb_module, cb[1])
120

    
121
    def process_message(message_data, message):
122
        logger.debug('%s', message_data)
123
        if callback:
124
            callback(message_data)
125
        message.ack()
126

    
127
    consumer.register_callback(process_message)
128
    try:
129
        consumer.wait()
130
    except KeyboardInterrupt:
131
        pass
132

    
133
if __name__ == '__main__':
134
    main()
135