Revision 4cfccdd2 snf-pithos-backend/pithos/backends/lib/rabbitmq/queue.py
b/snf-pithos-backend/pithos/backends/lib/rabbitmq/queue.py | ||
---|---|---|
31 | 31 |
# interpreted as representing official policies, either expressed |
32 | 32 |
# or implied, of GRNET S.A. |
33 | 33 |
|
34 |
from synnefo.lib.queue import exchange_connect, exchange_send, exchange_close, Receipt
|
|
34 |
import json
|
|
35 | 35 |
|
36 |
from synnefo.lib.amqp import AMQPClient |
|
37 |
from synnefo.lib.queue import Receipt |
|
36 | 38 |
|
37 | 39 |
class Queue(object): |
38 | 40 |
"""Queue. |
39 |
Required constructor parameters: exchange, client_id. |
|
41 |
Required constructor parameters: hosts, exchange, client_id.
|
|
40 | 42 |
""" |
41 | 43 |
|
42 | 44 |
def __init__(self, **params): |
43 |
exchange = params['exchange']
|
|
44 |
self.conn = exchange_connect(exchange)
|
|
45 |
hosts = params['hosts']
|
|
46 |
self.exchange = params['exchange']
|
|
45 | 47 |
self.client_id = params['client_id'] |
46 | 48 |
|
47 |
def send(self, message_key, user, instance, resource, value, details): |
|
48 |
body = Receipt( |
|
49 |
self.client_id, user, instance, resource, value, details).format() |
|
50 |
exchange_send(self.conn, message_key, body) |
|
49 |
self.client = AMQPClient(hosts=hosts) |
|
50 |
self.client.connect() |
|
51 |
|
|
52 |
self.client.exchange_declare(exchange=self.exchange, |
|
53 |
type='topic') |
|
51 | 54 |
|
55 |
def send(self, message_key, user, instance, resource, value, details): |
|
56 |
body = Receipt(self.client_id, user, instance, resource, value, details).format() |
|
57 |
self.client.basic_publish(exchange=self.exchange, |
|
58 |
routing_key=message_key, |
|
59 |
body=json.dumps(body)) |
|
60 |
|
|
52 | 61 |
def close(self): |
53 |
exchange_close(self.conn) |
|
62 |
self.client.close() |
Also available in: Unified diff