Statistics
| Branch: | Tag: | Revision:

root / astakos / im / queue.py @ 963fde65

History | View | Annotate | Download (4 kB)

1
# Copyright 2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import pika
35
import json
36
import uuid
37

    
38
from urlparse import urlparse
39
from time import time
40

    
41

    
42
def exchange_connect(exchange, vhost='/'):
43
    """Format exchange as a URI: rabbitmq://user:pass@host:port/exchange"""
44
    parts = urlparse(exchange)
45
    if parts.scheme != 'rabbitmq':
46
        return None
47
    if len(parts.path) < 2 or not parts.path.startswith('/'):
48
        return None
49
    exchange = parts.path[1:]
50
    connection = pika.BlockingConnection(pika.ConnectionParameters(
51
                    host=parts.hostname, port=parts.port, virtual_host=vhost,
52
                    credentials=pika.PlainCredentials(parts.username, parts.password)))
53
    channel = connection.channel()
54
    channel.exchange_declare(exchange=exchange, type='topic', durable=True)
55
    return (connection, channel, exchange)
56

    
57
def exchange_close(conn):
58
    connection, channel, exchange = conn
59
    connection.close()
60

    
61
def exchange_send(conn, key, value):
62
    """Messages are sent to exchanges at a key."""
63
    connection, channel, exchange = conn
64
    channel.basic_publish(exchange=exchange,
65
                          routing_key=key,
66
                          body=json.dumps(value))
67

    
68
    
69
def exchange_route(conn, key, queue):
70
    """Set up routing of keys to queue."""
71
    connection, channel, exchange = conn
72
    channel.queue_declare(queue=queue, durable=True,
73
                          exclusive=False, auto_delete=False)
74
    channel.queue_bind(exchange=exchange,
75
                       queue=queue,
76
                       routing_key=key)
77

    
78
def queue_callback(conn, queue, cb):
79
    
80
    def handle_delivery(channel, method_frame, header_frame, body):
81
        #print 'Basic.Deliver %s delivery-tag %i: %s' % (header_frame.content_type,
82
        #                                                method_frame.delivery_tag,
83
        #                                                body)
84
        if cb:
85
            cb(json.loads(body))
86
        channel.basic_ack(delivery_tag=method_frame.delivery_tag)
87
    
88
    connection, channel, exchange = conn
89
    channel.basic_consume(handle_delivery, queue=queue)
90

    
91
def queue_start(conn):
92
    connection, channel, exchange = conn
93
    channel.start_consuming()
94

    
95
class Receipt(object):
96
    def __init__(self, client, user, resource, value, details=None):
97
        self.eventVersion = 1
98
        self.id = str(uuid.uuid4())
99
        self.timestamp = int(time() * 1000)
100
        self.clientId = client
101
        self.userId = user
102
        self.resource = resource
103
        self.value = value
104
        if details:
105
            self.details = details
106
    
107
    def format(self):
108
        return self.__dict__