Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-tools / pithos / tools / dispatcher.py @ 6e60d547

History | View | Annotate | Download (4.9 kB)

1 2e450abb Antony Chazapis
#!/usr/bin/env python
2 2e450abb Antony Chazapis
3 2e662088 Antony Chazapis
# Copyright 2011-2012 GRNET S.A. All rights reserved.
4 2e450abb Antony Chazapis
#
5 2e450abb Antony Chazapis
# Redistribution and use in source and binary forms, with or
6 2e450abb Antony Chazapis
# without modification, are permitted provided that the following
7 2e450abb Antony Chazapis
# conditions are met:
8 2e450abb Antony Chazapis
#
9 2e450abb Antony Chazapis
#   1. Redistributions of source code must retain the above
10 2e450abb Antony Chazapis
#      copyright notice, this list of conditions and the following
11 2e450abb Antony Chazapis
#      disclaimer.
12 2e450abb Antony Chazapis
#
13 2e450abb Antony Chazapis
#   2. Redistributions in binary form must reproduce the above
14 2e450abb Antony Chazapis
#      copyright notice, this list of conditions and the following
15 2e450abb Antony Chazapis
#      disclaimer in the documentation and/or other materials
16 2e450abb Antony Chazapis
#      provided with the distribution.
17 2e450abb Antony Chazapis
#
18 2e450abb Antony Chazapis
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
19 2e450abb Antony Chazapis
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
20 2e450abb Antony Chazapis
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
21 2e450abb Antony Chazapis
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
22 2e450abb Antony Chazapis
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23 2e450abb Antony Chazapis
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24 2e450abb Antony Chazapis
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
25 2e450abb Antony Chazapis
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
26 2e450abb Antony Chazapis
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
27 2e450abb Antony Chazapis
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
28 2e450abb Antony Chazapis
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29 2e450abb Antony Chazapis
# POSSIBILITY OF SUCH DAMAGE.
30 2e450abb Antony Chazapis
#
31 2e450abb Antony Chazapis
# The views and conclusions contained in the software and
32 2e450abb Antony Chazapis
# documentation are those of the authors and should not be
33 2e450abb Antony Chazapis
# interpreted as representing official policies, either expressed
34 2e450abb Antony Chazapis
# or implied, of GRNET S.A.
35 2e450abb Antony Chazapis
36 2e450abb Antony Chazapis
import sys
37 2e450abb Antony Chazapis
import logging
38 2e450abb Antony Chazapis
39 6e147ecc Antony Chazapis
from synnefo.lib.queue import (exchange_connect, exchange_close,
40 8f815802 Antony Chazapis
    exchange_send, exchange_route, queue_callback, queue_start)
41 9e826a59 Kostas Papadimitriou
42 a55bb574 Antony Chazapis
from optparse import OptionParser
43 6e60d547 Sofia Papagiannaki
44 6e60d547 Sofia Papagiannaki
from django.core.management import setup_environ
45 6e60d547 Sofia Papagiannaki
try:
46 6e60d547 Sofia Papagiannaki
    from synnefo import settings
47 6e60d547 Sofia Papagiannaki
except ImportError:
48 6e60d547 Sofia Papagiannaki
   raise Exception("Cannot import settings")
49 6e60d547 Sofia Papagiannaki
setup_environ(settings)
50 2e450abb Antony Chazapis
51 2e450abb Antony Chazapis
BROKER_HOST = 'localhost'
52 2e450abb Antony Chazapis
BROKER_PORT = 5672
53 2e450abb Antony Chazapis
BROKER_USER = 'guest'
54 2e450abb Antony Chazapis
BROKER_PASSWORD = 'guest'
55 2e450abb Antony Chazapis
BROKER_VHOST = '/'
56 2e450abb Antony Chazapis
57 2e450abb Antony Chazapis
CONSUMER_QUEUE = 'feed'
58 2e450abb Antony Chazapis
CONSUMER_EXCHANGE = 'sample'
59 2e450abb Antony Chazapis
CONSUMER_KEY = '#'
60 2e450abb Antony Chazapis
61 9e826a59 Kostas Papadimitriou
def main():
62 2e450abb Antony Chazapis
    parser = OptionParser()
63 2e450abb Antony Chazapis
    parser.add_option('-v', '--verbose', action='store_true', default=False,
64 2e450abb Antony Chazapis
                      dest='verbose', help='Enable verbose logging')
65 2e450abb Antony Chazapis
    parser.add_option('--host', default=BROKER_HOST, dest='host',
66 2e450abb Antony Chazapis
                      help='RabbitMQ host (default: %s)' % BROKER_HOST)
67 2e450abb Antony Chazapis
    parser.add_option('--port', default=BROKER_PORT, dest='port',
68 2e450abb Antony Chazapis
                      help='RabbitMQ port (default: %s)' % BROKER_PORT, type='int')
69 2e450abb Antony Chazapis
    parser.add_option('--user', default=BROKER_USER, dest='user',
70 2e450abb Antony Chazapis
                      help='RabbitMQ user (default: %s)' % BROKER_USER)
71 2e450abb Antony Chazapis
    parser.add_option('--password', default=BROKER_PASSWORD, dest='password',
72 2e450abb Antony Chazapis
                      help='RabbitMQ password (default: %s)' % BROKER_PASSWORD)
73 2e450abb Antony Chazapis
    parser.add_option('--vhost', default=BROKER_VHOST, dest='vhost',
74 2e450abb Antony Chazapis
                      help='RabbitMQ vhost (default: %s)' % BROKER_VHOST)
75 2e450abb Antony Chazapis
    parser.add_option('--queue', default=CONSUMER_QUEUE, dest='queue',
76 2e450abb Antony Chazapis
                      help='RabbitMQ queue (default: %s)' % CONSUMER_QUEUE)
77 2e450abb Antony Chazapis
    parser.add_option('--exchange', default=CONSUMER_EXCHANGE, dest='exchange',
78 2e450abb Antony Chazapis
                      help='RabbitMQ exchange (default: %s)' % CONSUMER_EXCHANGE)
79 2e450abb Antony Chazapis
    parser.add_option('--key', default=CONSUMER_KEY, dest='key',
80 2e450abb Antony Chazapis
                      help='RabbitMQ key (default: %s)' % CONSUMER_KEY)
81 2e450abb Antony Chazapis
    parser.add_option('--callback', default=None, dest='callback',
82 2e450abb Antony Chazapis
                      help='Callback function to consume messages')
83 2e450abb Antony Chazapis
    parser.add_option('--test', action='store_true', default=False,
84 2e450abb Antony Chazapis
                      dest='test', help='Produce a dummy message for testing')
85 2e450abb Antony Chazapis
    opts, args = parser.parse_args()
86 2e450abb Antony Chazapis
    
87 47218019 Sofia Papagiannaki
    DEBUG = False
88 2e450abb Antony Chazapis
    if opts.verbose:
89 2e450abb Antony Chazapis
        DEBUG = True
90 2e450abb Antony Chazapis
    logging.basicConfig(format='%(asctime)s [%(levelname)s] %(name)s %(message)s',
91 2e450abb Antony Chazapis
                        datefmt='%Y-%m-%d %H:%M:%S',
92 2e450abb Antony Chazapis
                        level=logging.DEBUG if DEBUG else logging.INFO)
93 2e450abb Antony Chazapis
    logger = logging.getLogger('dispatcher')
94 2e450abb Antony Chazapis
    
95 8f815802 Antony Chazapis
    exchange = 'rabbitmq://%s:%s@%s:%s/%s' % (opts.user, opts.password, opts.host, opts.port, opts.exchange)
96 8f815802 Antony Chazapis
    connection = exchange_connect(exchange)
97 2e450abb Antony Chazapis
    if opts.test:
98 8f815802 Antony Chazapis
        exchange_send(connection, opts.key, {"test": "0123456789"})
99 8f815802 Antony Chazapis
        exchange_close(connection)
100 2e450abb Antony Chazapis
        sys.exit()
101 2e450abb Antony Chazapis
    
102 2e450abb Antony Chazapis
    callback = None
103 2e450abb Antony Chazapis
    if opts.callback:
104 2e450abb Antony Chazapis
        cb = opts.callback.rsplit('.', 1)
105 2e450abb Antony Chazapis
        if len(cb) == 2:
106 2e450abb Antony Chazapis
            __import__(cb[0])
107 2e450abb Antony Chazapis
            cb_module = sys.modules[cb[0]]
108 2e450abb Antony Chazapis
            callback = getattr(cb_module, cb[1])
109 2e450abb Antony Chazapis
    
110 8f815802 Antony Chazapis
    def handle_message(msg):
111 8f815802 Antony Chazapis
        logger.debug('%s', msg)
112 2e450abb Antony Chazapis
        if callback:
113 8f815802 Antony Chazapis
            callback(msg)
114 2e450abb Antony Chazapis
    
115 8f815802 Antony Chazapis
    exchange_route(connection, opts.key, opts.queue)
116 8f815802 Antony Chazapis
    queue_callback(connection, opts.queue, handle_message)
117 2e450abb Antony Chazapis
    try:
118 8f815802 Antony Chazapis
        queue_start(connection)
119 2e450abb Antony Chazapis
    except KeyboardInterrupt:
120 2e450abb Antony Chazapis
        pass
121 2e450abb Antony Chazapis
122 88353602 Kostas Papadimitriou
123 9e826a59 Kostas Papadimitriou
if __name__ == '__main__':
124 9e826a59 Kostas Papadimitriou
    main()