Statistics
| Branch: | Tag: | Revision:

root / pithos / lib / queue.py @ 8f815802

History | View | Annotate | Download (3.6 kB)

1
# Copyright 2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import pika
35
import json
36

    
37
from urlparse import urlparse
38

    
39

    
40
def exchange_connect(exchange, vhost='/'):
41
    """Format exchange as a URI: rabbitmq://user:pass@host:port/exchange"""
42
    parts = urlparse(exchange)
43
    if parts.scheme != 'rabbitmq':
44
        return None
45
    if len(parts.path) < 2 or not parts.path.startswith('/'):
46
        return None
47
    exchange = parts.path[1:]
48
    connection = pika.BlockingConnection(pika.ConnectionParameters(
49
                    host=parts.hostname, port=parts.port, virtual_host=vhost,
50
                    credentials=pika.PlainCredentials(parts.username, parts.password)))
51
    channel = connection.channel()
52
    channel.exchange_declare(exchange=exchange, type='topic', durable=True)
53
    return (connection, channel, exchange)
54

    
55
def exchange_close(conn):
56
    connection, channel, exchange = conn
57
    connection.close()
58

    
59
def exchange_send(conn, key, value):
60
    """Messages are sent to exchanges at a key."""
61
    connection, channel, exchange = conn
62
    channel.basic_publish(exchange=exchange,
63
                          routing_key=key,
64
                          body=json.dumps(value))
65

    
66
    
67
def exchange_route(conn, key, queue):
68
    """Set up routing of keys to queue."""
69
    connection, channel, exchange = conn
70
    channel.queue_declare(queue=queue, durable=True,
71
                          exclusive=False, auto_delete=False)
72
    channel.queue_bind(exchange=exchange,
73
                       queue=queue,
74
                       routing_key=key)
75

    
76
def queue_callback(conn, queue, cb):
77
    
78
    def handle_delivery(channel, method_frame, header_frame, body):
79
        #print 'Basic.Deliver %s delivery-tag %i: %s' % (header_frame.content_type,
80
        #                                                method_frame.delivery_tag,
81
        #                                                body)
82
        if cb:
83
            cb(json.loads(body))
84
        channel.basic_ack(delivery_tag=method_frame.delivery_tag)
85
    
86
    connection, channel, exchange = conn
87
    channel.basic_consume(handle_delivery, queue=queue)
88

    
89
def queue_start(conn):
90
    connection, channel, exchange = conn
91
    channel.start_consuming()