Revision f4fbb0fa snf-pithos-backend/pithos/backends/lib/rabbitmq/queue.py

b/snf-pithos-backend/pithos/backends/lib/rabbitmq/queue.py
31 31
# interpreted as representing official policies, either expressed
32 32
# or implied, of GRNET S.A.
33 33

  
34
from synnefo.lib.queue import exchange_connect, exchange_send, exchange_close, Receipt
34
import json
35 35

  
36
from synnefo.lib.amqp import AMQPClient
37
from synnefo.lib.queue import Receipt
36 38

  
37 39
class Queue(object):
38 40
    """Queue.
39
       Required constructor parameters: exchange, client_id.
41
       Required constructor parameters: hosts, exchange, client_id.
40 42
    """
41 43

  
42 44
    def __init__(self, **params):
43
        exchange = params['exchange']
44
        self.conn = exchange_connect(exchange)
45
        hosts = params['hosts']
46
        self.exchange = params['exchange']
45 47
        self.client_id = params['client_id']
46 48

  
47
    def send(self, message_key, user, instance, resource, value, details):
48
        body = Receipt(
49
            self.client_id, user, instance, resource, value, details).format()
50
        exchange_send(self.conn, message_key, body)
49
        self.client = AMQPClient(hosts=hosts)
50
        self.client.connect()
51

  
52
        self.client.exchange_declare(exchange=self.exchange,
53
                                     type='topic')
51 54

  
55
    def send(self, message_key, user, instance, resource, value, details):
56
        body = Receipt(self.client_id, user, instance, resource, value, details).format()
57
        self.client.basic_publish(exchange=self.exchange,
58
                                  routing_key=message_key,
59
                                  body=json.dumps(body))
60
    
52 61
    def close(self):
53
        exchange_close(self.conn)
62
        self.client.close()

Also available in: Unified diff