Revision 2715ade4 snf-pithos-tools/pithos/tools/dispatcher.py
b/snf-pithos-tools/pithos/tools/dispatcher.py | ||
---|---|---|
37 | 37 |
import logging |
38 | 38 |
|
39 | 39 |
from synnefo.lib.queue import (exchange_connect, exchange_close, |
40 |
exchange_send, exchange_route, queue_callback, queue_start) |
|
40 |
exchange_send, exchange_route, queue_callback, queue_start)
|
|
41 | 41 |
|
42 | 42 |
from optparse import OptionParser |
43 | 43 |
|
... | ... | |
45 | 45 |
try: |
46 | 46 |
from synnefo import settings |
47 | 47 |
except ImportError: |
48 |
raise Exception("Cannot import settings") |
|
48 |
raise Exception("Cannot import settings")
|
|
49 | 49 |
setup_environ(settings) |
50 | 50 |
|
51 | 51 |
BROKER_HOST = 'localhost' |
... | ... | |
58 | 58 |
CONSUMER_EXCHANGE = 'sample' |
59 | 59 |
CONSUMER_KEY = '#' |
60 | 60 |
|
61 |
|
|
61 | 62 |
def main(): |
62 | 63 |
parser = OptionParser() |
63 | 64 |
parser.add_option('-v', '--verbose', action='store_true', default=False, |
... | ... | |
83 | 84 |
parser.add_option('--test', action='store_true', default=False, |
84 | 85 |
dest='test', help='Produce a dummy message for testing') |
85 | 86 |
opts, args = parser.parse_args() |
86 |
|
|
87 |
|
|
87 | 88 |
DEBUG = False |
88 | 89 |
if opts.verbose: |
89 | 90 |
DEBUG = True |
90 |
logging.basicConfig(format='%(asctime)s [%(levelname)s] %(name)s %(message)s', |
|
91 |
datefmt='%Y-%m-%d %H:%M:%S', |
|
92 |
level=logging.DEBUG if DEBUG else logging.INFO) |
|
91 |
logging.basicConfig( |
|
92 |
format='%(asctime)s [%(levelname)s] %(name)s %(message)s', |
|
93 |
datefmt='%Y-%m-%d %H:%M:%S', |
|
94 |
level=logging.DEBUG if DEBUG else logging.INFO) |
|
93 | 95 |
logger = logging.getLogger('dispatcher') |
94 |
|
|
95 |
exchange = 'rabbitmq://%s:%s@%s:%s/%s' % (opts.user, opts.password, opts.host, opts.port, opts.exchange) |
|
96 |
|
|
97 |
exchange = 'rabbitmq://%s:%s@%s:%s/%s' % ( |
|
98 |
opts.user, opts.password, opts.host, opts.port, opts.exchange) |
|
96 | 99 |
connection = exchange_connect(exchange) |
97 | 100 |
if opts.test: |
98 | 101 |
exchange_send(connection, opts.key, {"test": "0123456789"}) |
99 | 102 |
exchange_close(connection) |
100 | 103 |
sys.exit() |
101 |
|
|
104 |
|
|
102 | 105 |
callback = None |
103 | 106 |
if opts.callback: |
104 | 107 |
cb = opts.callback.rsplit('.', 1) |
... | ... | |
106 | 109 |
__import__(cb[0]) |
107 | 110 |
cb_module = sys.modules[cb[0]] |
108 | 111 |
callback = getattr(cb_module, cb[1]) |
109 |
|
|
112 |
|
|
110 | 113 |
def handle_message(msg): |
114 |
print msg |
|
111 | 115 |
logger.debug('%s', msg) |
112 | 116 |
if callback: |
113 | 117 |
callback(msg) |
114 |
|
|
118 |
|
|
115 | 119 |
exchange_route(connection, opts.key, opts.queue) |
116 | 120 |
queue_callback(connection, opts.queue, handle_message) |
117 | 121 |
try: |
Also available in: Unified diff