Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-tools / pithos / tools / dispatcher.py @ f4fbb0fa

History | View | Annotate | Download (5.2 kB)

1 2e450abb Antony Chazapis
#!/usr/bin/env python
2 2e450abb Antony Chazapis
3 2e662088 Antony Chazapis
# Copyright 2011-2012 GRNET S.A. All rights reserved.
4 2e450abb Antony Chazapis
#
5 2e450abb Antony Chazapis
# Redistribution and use in source and binary forms, with or
6 2e450abb Antony Chazapis
# without modification, are permitted provided that the following
7 2e450abb Antony Chazapis
# conditions are met:
8 2e450abb Antony Chazapis
#
9 2e450abb Antony Chazapis
#   1. Redistributions of source code must retain the above
10 2e450abb Antony Chazapis
#      copyright notice, this list of conditions and the following
11 2e450abb Antony Chazapis
#      disclaimer.
12 2e450abb Antony Chazapis
#
13 2e450abb Antony Chazapis
#   2. Redistributions in binary form must reproduce the above
14 2e450abb Antony Chazapis
#      copyright notice, this list of conditions and the following
15 2e450abb Antony Chazapis
#      disclaimer in the documentation and/or other materials
16 2e450abb Antony Chazapis
#      provided with the distribution.
17 2e450abb Antony Chazapis
#
18 2e450abb Antony Chazapis
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
19 2e450abb Antony Chazapis
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
20 2e450abb Antony Chazapis
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
21 2e450abb Antony Chazapis
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
22 2e450abb Antony Chazapis
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23 2e450abb Antony Chazapis
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24 2e450abb Antony Chazapis
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
25 2e450abb Antony Chazapis
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
26 2e450abb Antony Chazapis
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
27 2e450abb Antony Chazapis
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
28 2e450abb Antony Chazapis
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29 2e450abb Antony Chazapis
# POSSIBILITY OF SUCH DAMAGE.
30 2e450abb Antony Chazapis
#
31 2e450abb Antony Chazapis
# The views and conclusions contained in the software and
32 2e450abb Antony Chazapis
# documentation are those of the authors and should not be
33 2e450abb Antony Chazapis
# interpreted as representing official policies, either expressed
34 2e450abb Antony Chazapis
# or implied, of GRNET S.A.
35 2e450abb Antony Chazapis
36 2e450abb Antony Chazapis
import sys
37 2e450abb Antony Chazapis
import logging
38 f4fbb0fa Sofia Papagiannaki
import json
39 2e450abb Antony Chazapis
40 f4fbb0fa Sofia Papagiannaki
from synnefo.lib.amqp import AMQPClient
41 9e826a59 Kostas Papadimitriou
42 a55bb574 Antony Chazapis
from optparse import OptionParser
43 6e60d547 Sofia Papagiannaki
44 6e60d547 Sofia Papagiannaki
from django.core.management import setup_environ
45 6e60d547 Sofia Papagiannaki
try:
46 6e60d547 Sofia Papagiannaki
    from synnefo import settings
47 6e60d547 Sofia Papagiannaki
except ImportError:
48 2715ade4 Sofia Papagiannaki
    raise Exception("Cannot import settings")
49 6e60d547 Sofia Papagiannaki
setup_environ(settings)
50 2e450abb Antony Chazapis
51 2e450abb Antony Chazapis
BROKER_HOST = 'localhost'
52 2e450abb Antony Chazapis
BROKER_PORT = 5672
53 2e450abb Antony Chazapis
BROKER_USER = 'guest'
54 2e450abb Antony Chazapis
BROKER_PASSWORD = 'guest'
55 2e450abb Antony Chazapis
BROKER_VHOST = '/'
56 2e450abb Antony Chazapis
57 2e450abb Antony Chazapis
CONSUMER_QUEUE = 'feed'
58 2e450abb Antony Chazapis
CONSUMER_EXCHANGE = 'sample'
59 2e450abb Antony Chazapis
CONSUMER_KEY = '#'
60 2e450abb Antony Chazapis
61 2715ade4 Sofia Papagiannaki
62 9e826a59 Kostas Papadimitriou
def main():
63 2e450abb Antony Chazapis
    parser = OptionParser()
64 2e450abb Antony Chazapis
    parser.add_option('-v', '--verbose', action='store_true', default=False,
65 2e450abb Antony Chazapis
                      dest='verbose', help='Enable verbose logging')
66 2e450abb Antony Chazapis
    parser.add_option('--host', default=BROKER_HOST, dest='host',
67 2e450abb Antony Chazapis
                      help='RabbitMQ host (default: %s)' % BROKER_HOST)
68 2e450abb Antony Chazapis
    parser.add_option('--port', default=BROKER_PORT, dest='port',
69 2e450abb Antony Chazapis
                      help='RabbitMQ port (default: %s)' % BROKER_PORT, type='int')
70 2e450abb Antony Chazapis
    parser.add_option('--user', default=BROKER_USER, dest='user',
71 2e450abb Antony Chazapis
                      help='RabbitMQ user (default: %s)' % BROKER_USER)
72 2e450abb Antony Chazapis
    parser.add_option('--password', default=BROKER_PASSWORD, dest='password',
73 2e450abb Antony Chazapis
                      help='RabbitMQ password (default: %s)' % BROKER_PASSWORD)
74 2e450abb Antony Chazapis
    parser.add_option('--vhost', default=BROKER_VHOST, dest='vhost',
75 2e450abb Antony Chazapis
                      help='RabbitMQ vhost (default: %s)' % BROKER_VHOST)
76 2e450abb Antony Chazapis
    parser.add_option('--queue', default=CONSUMER_QUEUE, dest='queue',
77 2e450abb Antony Chazapis
                      help='RabbitMQ queue (default: %s)' % CONSUMER_QUEUE)
78 2e450abb Antony Chazapis
    parser.add_option('--exchange', default=CONSUMER_EXCHANGE, dest='exchange',
79 2e450abb Antony Chazapis
                      help='RabbitMQ exchange (default: %s)' % CONSUMER_EXCHANGE)
80 2e450abb Antony Chazapis
    parser.add_option('--key', default=CONSUMER_KEY, dest='key',
81 2e450abb Antony Chazapis
                      help='RabbitMQ key (default: %s)' % CONSUMER_KEY)
82 2e450abb Antony Chazapis
    parser.add_option('--callback', default=None, dest='callback',
83 2e450abb Antony Chazapis
                      help='Callback function to consume messages')
84 2e450abb Antony Chazapis
    parser.add_option('--test', action='store_true', default=False,
85 2e450abb Antony Chazapis
                      dest='test', help='Produce a dummy message for testing')
86 2e450abb Antony Chazapis
    opts, args = parser.parse_args()
87 2715ade4 Sofia Papagiannaki
88 47218019 Sofia Papagiannaki
    DEBUG = False
89 2e450abb Antony Chazapis
    if opts.verbose:
90 2e450abb Antony Chazapis
        DEBUG = True
91 2715ade4 Sofia Papagiannaki
    logging.basicConfig(
92 2715ade4 Sofia Papagiannaki
        format='%(asctime)s [%(levelname)s] %(name)s %(message)s',
93 2715ade4 Sofia Papagiannaki
        datefmt='%Y-%m-%d %H:%M:%S',
94 2715ade4 Sofia Papagiannaki
        level=logging.DEBUG if DEBUG else logging.INFO)
95 2e450abb Antony Chazapis
    logger = logging.getLogger('dispatcher')
96 2715ade4 Sofia Papagiannaki
97 f4fbb0fa Sofia Papagiannaki
    host =  'amqp://%s:%s@%s:%s' % (opts.user, opts.password, opts.host, opts.port)
98 f4fbb0fa Sofia Papagiannaki
    queue = opts.queue
99 f4fbb0fa Sofia Papagiannaki
    key = opts.key
100 f4fbb0fa Sofia Papagiannaki
    exchange = opts.exchange
101 f4fbb0fa Sofia Papagiannaki
    
102 f4fbb0fa Sofia Papagiannaki
    client = AMQPClient(hosts=[host])
103 f4fbb0fa Sofia Papagiannaki
    client.connect()
104 f4fbb0fa Sofia Papagiannaki
105 2e450abb Antony Chazapis
    if opts.test:
106 f4fbb0fa Sofia Papagiannaki
        client.exchange_declare(exchange=exchange,
107 f4fbb0fa Sofia Papagiannaki
                                type='topic')
108 f4fbb0fa Sofia Papagiannaki
        client.basic_publish(exchange=exchange,
109 f4fbb0fa Sofia Papagiannaki
                             routing_key=key,
110 f4fbb0fa Sofia Papagiannaki
                             body= json.dumps({"test": "0123456789"}))
111 f4fbb0fa Sofia Papagiannaki
        client.close()
112 2e450abb Antony Chazapis
        sys.exit()
113 2715ade4 Sofia Papagiannaki
114 2e450abb Antony Chazapis
    callback = None
115 2e450abb Antony Chazapis
    if opts.callback:
116 2e450abb Antony Chazapis
        cb = opts.callback.rsplit('.', 1)
117 2e450abb Antony Chazapis
        if len(cb) == 2:
118 2e450abb Antony Chazapis
            __import__(cb[0])
119 2e450abb Antony Chazapis
            cb_module = sys.modules[cb[0]]
120 2e450abb Antony Chazapis
            callback = getattr(cb_module, cb[1])
121 2715ade4 Sofia Papagiannaki
122 f4fbb0fa Sofia Papagiannaki
    def handle_message(client, msg):
123 8f815802 Antony Chazapis
        logger.debug('%s', msg)
124 2e450abb Antony Chazapis
        if callback:
125 8f815802 Antony Chazapis
            callback(msg)
126 f4fbb0fa Sofia Papagiannaki
        client.basic_ack(msg)
127 f4fbb0fa Sofia Papagiannaki
128 f4fbb0fa Sofia Papagiannaki
    client.queue_declare(queue=queue)
129 f4fbb0fa Sofia Papagiannaki
    client.queue_bind(queue=queue,
130 f4fbb0fa Sofia Papagiannaki
                      exchange=exchange,
131 f4fbb0fa Sofia Papagiannaki
                      routing_key=key)
132 f4fbb0fa Sofia Papagiannaki
133 f4fbb0fa Sofia Papagiannaki
    client.basic_consume(queue=queue, callback=handle_message)
134 2715ade4 Sofia Papagiannaki
135 2e450abb Antony Chazapis
    try:
136 f4fbb0fa Sofia Papagiannaki
        while True:
137 f4fbb0fa Sofia Papagiannaki
            client.basic_wait()
138 2e450abb Antony Chazapis
    except KeyboardInterrupt:
139 2e450abb Antony Chazapis
        pass
140 f4fbb0fa Sofia Papagiannaki
    finally:
141 f4fbb0fa Sofia Papagiannaki
        client.close()
142 2e450abb Antony Chazapis
143 88353602 Kostas Papadimitriou
144 9e826a59 Kostas Papadimitriou
if __name__ == '__main__':
145 9e826a59 Kostas Papadimitriou
    main()