Revision f4fbb0fa snf-pithos-tools/pithos/tools/dispatcher.py

b/snf-pithos-tools/pithos/tools/dispatcher.py
35 35

  
36 36
import sys
37 37
import logging
38
import json
38 39

  
39
from synnefo.lib.queue import (exchange_connect, exchange_close,
40
                               exchange_send, exchange_route, queue_callback, queue_start)
40
from synnefo.lib.amqp import AMQPClient
41 41

  
42 42
from optparse import OptionParser
43 43

  
......
94 94
        level=logging.DEBUG if DEBUG else logging.INFO)
95 95
    logger = logging.getLogger('dispatcher')
96 96

  
97
    exchange = 'rabbitmq://%s:%s@%s:%s/%s' % (
98
        opts.user, opts.password, opts.host, opts.port, opts.exchange)
99
    connection = exchange_connect(exchange)
97
    host =  'amqp://%s:%s@%s:%s' % (opts.user, opts.password, opts.host, opts.port)
98
    queue = opts.queue
99
    key = opts.key
100
    exchange = opts.exchange
101
    
102
    client = AMQPClient(hosts=[host])
103
    client.connect()
104

  
100 105
    if opts.test:
101
        exchange_send(connection, opts.key, {"test": "0123456789"})
102
        exchange_close(connection)
106
        client.exchange_declare(exchange=exchange,
107
                                type='topic')
108
        client.basic_publish(exchange=exchange,
109
                             routing_key=key,
110
                             body= json.dumps({"test": "0123456789"}))
111
        client.close()
103 112
        sys.exit()
104 113

  
105 114
    callback = None
......
110 119
            cb_module = sys.modules[cb[0]]
111 120
            callback = getattr(cb_module, cb[1])
112 121

  
113
    def handle_message(msg):
114
        print msg
122
    def handle_message(client, msg):
115 123
        logger.debug('%s', msg)
116 124
        if callback:
117 125
            callback(msg)
126
        client.basic_ack(msg)
127

  
128
    client.queue_declare(queue=queue)
129
    client.queue_bind(queue=queue,
130
                      exchange=exchange,
131
                      routing_key=key)
132

  
133
    client.basic_consume(queue=queue, callback=handle_message)
118 134

  
119
    exchange_route(connection, opts.key, opts.queue)
120
    queue_callback(connection, opts.queue, handle_message)
121 135
    try:
122
        queue_start(connection)
136
        while True:
137
            client.basic_wait()
123 138
    except KeyboardInterrupt:
124 139
        pass
140
    finally:
141
        client.close()
125 142

  
126 143

  
127 144
if __name__ == '__main__':

Also available in: Unified diff