Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-tools / pithos / tools / dispatcher.py @ 94a83ed6

History | View | Annotate | Download (5.2 kB)

1
#!/usr/bin/env python
2

    
3
# Copyright 2011-2012 GRNET S.A. All rights reserved.
4
#
5
# Redistribution and use in source and binary forms, with or
6
# without modification, are permitted provided that the following
7
# conditions are met:
8
#
9
#   1. Redistributions of source code must retain the above
10
#      copyright notice, this list of conditions and the following
11
#      disclaimer.
12
#
13
#   2. Redistributions in binary form must reproduce the above
14
#      copyright notice, this list of conditions and the following
15
#      disclaimer in the documentation and/or other materials
16
#      provided with the distribution.
17
#
18
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
19
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
20
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
21
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
22
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
25
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
26
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
27
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
28
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29
# POSSIBILITY OF SUCH DAMAGE.
30
#
31
# The views and conclusions contained in the software and
32
# documentation are those of the authors and should not be
33
# interpreted as representing official policies, either expressed
34
# or implied, of GRNET S.A.
35

    
36
import sys
37
import logging
38
import json
39

    
40
from synnefo.lib.amqp import AMQPClient
41

    
42
from optparse import OptionParser
43

    
44
from django.core.management import setup_environ
45
try:
46
    from synnefo import settings
47
except ImportError:
48
    raise Exception("Cannot import settings")
49
setup_environ(settings)
50

    
51
BROKER_HOST = 'localhost'
52
BROKER_PORT = 5672
53
BROKER_USER = 'guest'
54
BROKER_PASSWORD = 'guest'
55
BROKER_VHOST = '/'
56

    
57
CONSUMER_QUEUE = 'feed'
58
CONSUMER_EXCHANGE = 'sample'
59
CONSUMER_KEY = '#'
60

    
61

    
62
def main():
63
    parser = OptionParser()
64
    parser.add_option('-v', '--verbose', action='store_true', default=False,
65
                      dest='verbose', help='Enable verbose logging')
66
    parser.add_option('--host', default=BROKER_HOST, dest='host',
67
                      help='RabbitMQ host (default: %s)' % BROKER_HOST)
68
    parser.add_option('--port', default=BROKER_PORT, dest='port',
69
                      help='RabbitMQ port (default: %s)' % BROKER_PORT, type='int')
70
    parser.add_option('--user', default=BROKER_USER, dest='user',
71
                      help='RabbitMQ user (default: %s)' % BROKER_USER)
72
    parser.add_option('--password', default=BROKER_PASSWORD, dest='password',
73
                      help='RabbitMQ password (default: %s)' % BROKER_PASSWORD)
74
    parser.add_option('--vhost', default=BROKER_VHOST, dest='vhost',
75
                      help='RabbitMQ vhost (default: %s)' % BROKER_VHOST)
76
    parser.add_option('--queue', default=CONSUMER_QUEUE, dest='queue',
77
                      help='RabbitMQ queue (default: %s)' % CONSUMER_QUEUE)
78
    parser.add_option('--exchange', default=CONSUMER_EXCHANGE, dest='exchange',
79
                      help='RabbitMQ exchange (default: %s)' % CONSUMER_EXCHANGE)
80
    parser.add_option('--key', default=CONSUMER_KEY, dest='key',
81
                      help='RabbitMQ key (default: %s)' % CONSUMER_KEY)
82
    parser.add_option('--callback', default=None, dest='callback',
83
                      help='Callback function to consume messages')
84
    parser.add_option('--test', action='store_true', default=False,
85
                      dest='test', help='Produce a dummy message for testing')
86
    opts, args = parser.parse_args()
87

    
88
    DEBUG = False
89
    if opts.verbose:
90
        DEBUG = True
91
    logging.basicConfig(
92
        format='%(asctime)s [%(levelname)s] %(name)s %(message)s',
93
        datefmt='%Y-%m-%d %H:%M:%S',
94
        level=logging.DEBUG if DEBUG else logging.INFO)
95
    logger = logging.getLogger('dispatcher')
96

    
97
    host =  'amqp://%s:%s@%s:%s' % (opts.user, opts.password, opts.host, opts.port)
98
    queue = opts.queue
99
    key = opts.key
100
    exchange = opts.exchange
101
    
102
    client = AMQPClient(hosts=[host])
103
    client.connect()
104

    
105
    if opts.test:
106
        client.exchange_declare(exchange=exchange,
107
                                type='topic')
108
        client.basic_publish(exchange=exchange,
109
                             routing_key=key,
110
                             body= json.dumps({"test": "0123456789"}))
111
        client.close()
112
        sys.exit()
113

    
114
    callback = None
115
    if opts.callback:
116
        cb = opts.callback.rsplit('.', 1)
117
        if len(cb) == 2:
118
            __import__(cb[0])
119
            cb_module = sys.modules[cb[0]]
120
            callback = getattr(cb_module, cb[1])
121

    
122
    def handle_message(client, msg):
123
        logger.debug('%s', msg)
124
        if callback:
125
            callback(msg)
126
        client.basic_ack(msg)
127

    
128
    client.queue_declare(queue=queue)
129
    client.queue_bind(queue=queue,
130
                      exchange=exchange,
131
                      routing_key=key)
132

    
133
    client.basic_consume(queue=queue, callback=handle_message)
134

    
135
    try:
136
        while True:
137
            client.basic_wait()
138
    except KeyboardInterrupt:
139
        pass
140
    finally:
141
        client.close()
142

    
143

    
144
if __name__ == '__main__':
145
    main()