Revision f4fbb0fa snf-pithos-tools/pithos/tools/dispatcher.py
b/snf-pithos-tools/pithos/tools/dispatcher.py | ||
---|---|---|
35 | 35 |
|
36 | 36 |
import sys |
37 | 37 |
import logging |
38 |
import json |
|
38 | 39 |
|
39 |
from synnefo.lib.queue import (exchange_connect, exchange_close, |
|
40 |
exchange_send, exchange_route, queue_callback, queue_start) |
|
40 |
from synnefo.lib.amqp import AMQPClient |
|
41 | 41 |
|
42 | 42 |
from optparse import OptionParser |
43 | 43 |
|
... | ... | |
94 | 94 |
level=logging.DEBUG if DEBUG else logging.INFO) |
95 | 95 |
logger = logging.getLogger('dispatcher') |
96 | 96 |
|
97 |
exchange = 'rabbitmq://%s:%s@%s:%s/%s' % ( |
|
98 |
opts.user, opts.password, opts.host, opts.port, opts.exchange) |
|
99 |
connection = exchange_connect(exchange) |
|
97 |
host = 'amqp://%s:%s@%s:%s' % (opts.user, opts.password, opts.host, opts.port) |
|
98 |
queue = opts.queue |
|
99 |
key = opts.key |
|
100 |
exchange = opts.exchange |
|
101 |
|
|
102 |
client = AMQPClient(hosts=[host]) |
|
103 |
client.connect() |
|
104 |
|
|
100 | 105 |
if opts.test: |
101 |
exchange_send(connection, opts.key, {"test": "0123456789"}) |
|
102 |
exchange_close(connection) |
|
106 |
client.exchange_declare(exchange=exchange, |
|
107 |
type='topic') |
|
108 |
client.basic_publish(exchange=exchange, |
|
109 |
routing_key=key, |
|
110 |
body= json.dumps({"test": "0123456789"})) |
|
111 |
client.close() |
|
103 | 112 |
sys.exit() |
104 | 113 |
|
105 | 114 |
callback = None |
... | ... | |
110 | 119 |
cb_module = sys.modules[cb[0]] |
111 | 120 |
callback = getattr(cb_module, cb[1]) |
112 | 121 |
|
113 |
def handle_message(msg): |
|
114 |
print msg |
|
122 |
def handle_message(client, msg): |
|
115 | 123 |
logger.debug('%s', msg) |
116 | 124 |
if callback: |
117 | 125 |
callback(msg) |
126 |
client.basic_ack(msg) |
|
127 |
|
|
128 |
client.queue_declare(queue=queue) |
|
129 |
client.queue_bind(queue=queue, |
|
130 |
exchange=exchange, |
|
131 |
routing_key=key) |
|
132 |
|
|
133 |
client.basic_consume(queue=queue, callback=handle_message) |
|
118 | 134 |
|
119 |
exchange_route(connection, opts.key, opts.queue) |
|
120 |
queue_callback(connection, opts.queue, handle_message) |
|
121 | 135 |
try: |
122 |
queue_start(connection) |
|
136 |
while True: |
|
137 |
client.basic_wait() |
|
123 | 138 |
except KeyboardInterrupt: |
124 | 139 |
pass |
140 |
finally: |
|
141 |
client.close() |
|
125 | 142 |
|
126 | 143 |
|
127 | 144 |
if __name__ == '__main__': |
Also available in: Unified diff