Revision 963fde65

b/astakos/im/queue.py
1
# Copyright 2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

  
34
import pika
35
import json
36
import uuid
37

  
38
from urlparse import urlparse
39
from time import time
40

  
41

  
42
def exchange_connect(exchange, vhost='/'):
43
    """Format exchange as a URI: rabbitmq://user:pass@host:port/exchange"""
44
    parts = urlparse(exchange)
45
    if parts.scheme != 'rabbitmq':
46
        return None
47
    if len(parts.path) < 2 or not parts.path.startswith('/'):
48
        return None
49
    exchange = parts.path[1:]
50
    connection = pika.BlockingConnection(pika.ConnectionParameters(
51
                    host=parts.hostname, port=parts.port, virtual_host=vhost,
52
                    credentials=pika.PlainCredentials(parts.username, parts.password)))
53
    channel = connection.channel()
54
    channel.exchange_declare(exchange=exchange, type='topic', durable=True)
55
    return (connection, channel, exchange)
56

  
57
def exchange_close(conn):
58
    connection, channel, exchange = conn
59
    connection.close()
60

  
61
def exchange_send(conn, key, value):
62
    """Messages are sent to exchanges at a key."""
63
    connection, channel, exchange = conn
64
    channel.basic_publish(exchange=exchange,
65
                          routing_key=key,
66
                          body=json.dumps(value))
67

  
68
    
69
def exchange_route(conn, key, queue):
70
    """Set up routing of keys to queue."""
71
    connection, channel, exchange = conn
72
    channel.queue_declare(queue=queue, durable=True,
73
                          exclusive=False, auto_delete=False)
74
    channel.queue_bind(exchange=exchange,
75
                       queue=queue,
76
                       routing_key=key)
77

  
78
def queue_callback(conn, queue, cb):
79
    
80
    def handle_delivery(channel, method_frame, header_frame, body):
81
        #print 'Basic.Deliver %s delivery-tag %i: %s' % (header_frame.content_type,
82
        #                                                method_frame.delivery_tag,
83
        #                                                body)
84
        if cb:
85
            cb(json.loads(body))
86
        channel.basic_ack(delivery_tag=method_frame.delivery_tag)
87
    
88
    connection, channel, exchange = conn
89
    channel.basic_consume(handle_delivery, queue=queue)
90

  
91
def queue_start(conn):
92
    connection, channel, exchange = conn
93
    channel.start_consuming()
94

  
95
class Receipt(object):
96
    def __init__(self, client, user, resource, value, details=None):
97
        self.eventVersion = 1
98
        self.id = str(uuid.uuid4())
99
        self.timestamp = int(time() * 1000)
100
        self.clientId = client
101
        self.userId = user
102
        self.resource = resource
103
        self.value = value
104
        if details:
105
            self.details = details
106
    
107
    def format(self):
108
        return self.__dict__
b/astakos/im/rabbitmq/__init__.py
1
# Copyright 2012 GRNET S.A. All rights reserved.
2
# 
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
# 
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
# 
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
# 
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
# 
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

  
34
from queue import Queue
35

  
36
__all__ = ["Queue"]
37

  
b/astakos/im/rabbitmq/queue.py
1
# Copyright 2012 GRNET S.A. All rights reserved.
2
# 
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
# 
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
# 
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
# 
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
# 
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

  
34
from astakos.im.queue import exchange_connect, exchange_send, Receipt
35

  
36
class Queue(object):
37
    """Queue.
38
       Required constructor parameters: exchange, message_key, client_id.
39
    """
40
    
41
    def __init__(self, **params):
42
        exchange = params['exchange']
43
        self.conn = exchange_connect(exchange)
44
        self.message_key = params['message_key']
45
        self.client_id = params['client_id']
46
    
47
    def send(self, user, resource, value, details):
48
        body = Receipt(self.client_id, user, resource, value, details).format()
49
        exchange_send(self.conn, self.message_key, body)
50

  

Also available in: Unified diff