Statistics
| Branch: | Tag: | Revision:

root / pithos / lib / queue.py @ c878682f

History | View | Annotate | Download (4.2 kB)

1 8f815802 Antony Chazapis
# Copyright 2012 GRNET S.A. All rights reserved.
2 8f815802 Antony Chazapis
#
3 8f815802 Antony Chazapis
# Redistribution and use in source and binary forms, with or
4 8f815802 Antony Chazapis
# without modification, are permitted provided that the following
5 8f815802 Antony Chazapis
# conditions are met:
6 8f815802 Antony Chazapis
#
7 8f815802 Antony Chazapis
#   1. Redistributions of source code must retain the above
8 8f815802 Antony Chazapis
#      copyright notice, this list of conditions and the following
9 8f815802 Antony Chazapis
#      disclaimer.
10 8f815802 Antony Chazapis
#
11 8f815802 Antony Chazapis
#   2. Redistributions in binary form must reproduce the above
12 8f815802 Antony Chazapis
#      copyright notice, this list of conditions and the following
13 8f815802 Antony Chazapis
#      disclaimer in the documentation and/or other materials
14 8f815802 Antony Chazapis
#      provided with the distribution.
15 8f815802 Antony Chazapis
#
16 8f815802 Antony Chazapis
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 8f815802 Antony Chazapis
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 8f815802 Antony Chazapis
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 8f815802 Antony Chazapis
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 8f815802 Antony Chazapis
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 8f815802 Antony Chazapis
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 8f815802 Antony Chazapis
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 8f815802 Antony Chazapis
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 8f815802 Antony Chazapis
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 8f815802 Antony Chazapis
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 8f815802 Antony Chazapis
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 8f815802 Antony Chazapis
# POSSIBILITY OF SUCH DAMAGE.
28 8f815802 Antony Chazapis
#
29 8f815802 Antony Chazapis
# The views and conclusions contained in the software and
30 8f815802 Antony Chazapis
# documentation are those of the authors and should not be
31 8f815802 Antony Chazapis
# interpreted as representing official policies, either expressed
32 8f815802 Antony Chazapis
# or implied, of GRNET S.A.
33 8f815802 Antony Chazapis
34 8f815802 Antony Chazapis
import pika
35 8f815802 Antony Chazapis
import json
36 fa9cae7e Antony Chazapis
import uuid
37 8f815802 Antony Chazapis
38 8f815802 Antony Chazapis
from urlparse import urlparse
39 c878682f Antony Chazapis
from hashlib import sha1
40 c878682f Antony Chazapis
from random import random
41 fa9cae7e Antony Chazapis
from time import time
42 8f815802 Antony Chazapis
43 8f815802 Antony Chazapis
44 8f815802 Antony Chazapis
def exchange_connect(exchange, vhost='/'):
45 8f815802 Antony Chazapis
    """Format exchange as a URI: rabbitmq://user:pass@host:port/exchange"""
46 8f815802 Antony Chazapis
    parts = urlparse(exchange)
47 8f815802 Antony Chazapis
    if parts.scheme != 'rabbitmq':
48 8f815802 Antony Chazapis
        return None
49 8f815802 Antony Chazapis
    if len(parts.path) < 2 or not parts.path.startswith('/'):
50 8f815802 Antony Chazapis
        return None
51 8f815802 Antony Chazapis
    exchange = parts.path[1:]
52 8f815802 Antony Chazapis
    connection = pika.BlockingConnection(pika.ConnectionParameters(
53 8f815802 Antony Chazapis
                    host=parts.hostname, port=parts.port, virtual_host=vhost,
54 8f815802 Antony Chazapis
                    credentials=pika.PlainCredentials(parts.username, parts.password)))
55 8f815802 Antony Chazapis
    channel = connection.channel()
56 8f815802 Antony Chazapis
    channel.exchange_declare(exchange=exchange, type='topic', durable=True)
57 8f815802 Antony Chazapis
    return (connection, channel, exchange)
58 8f815802 Antony Chazapis
59 8f815802 Antony Chazapis
def exchange_close(conn):
60 8f815802 Antony Chazapis
    connection, channel, exchange = conn
61 8f815802 Antony Chazapis
    connection.close()
62 8f815802 Antony Chazapis
63 8f815802 Antony Chazapis
def exchange_send(conn, key, value):
64 8f815802 Antony Chazapis
    """Messages are sent to exchanges at a key."""
65 8f815802 Antony Chazapis
    connection, channel, exchange = conn
66 8f815802 Antony Chazapis
    channel.basic_publish(exchange=exchange,
67 8f815802 Antony Chazapis
                          routing_key=key,
68 8f815802 Antony Chazapis
                          body=json.dumps(value))
69 8f815802 Antony Chazapis
70 8f815802 Antony Chazapis
    
71 8f815802 Antony Chazapis
def exchange_route(conn, key, queue):
72 8f815802 Antony Chazapis
    """Set up routing of keys to queue."""
73 8f815802 Antony Chazapis
    connection, channel, exchange = conn
74 8f815802 Antony Chazapis
    channel.queue_declare(queue=queue, durable=True,
75 8f815802 Antony Chazapis
                          exclusive=False, auto_delete=False)
76 8f815802 Antony Chazapis
    channel.queue_bind(exchange=exchange,
77 8f815802 Antony Chazapis
                       queue=queue,
78 8f815802 Antony Chazapis
                       routing_key=key)
79 8f815802 Antony Chazapis
80 8f815802 Antony Chazapis
def queue_callback(conn, queue, cb):
81 8f815802 Antony Chazapis
    
82 8f815802 Antony Chazapis
    def handle_delivery(channel, method_frame, header_frame, body):
83 8f815802 Antony Chazapis
        #print 'Basic.Deliver %s delivery-tag %i: %s' % (header_frame.content_type,
84 8f815802 Antony Chazapis
        #                                                method_frame.delivery_tag,
85 8f815802 Antony Chazapis
        #                                                body)
86 8f815802 Antony Chazapis
        if cb:
87 8f815802 Antony Chazapis
            cb(json.loads(body))
88 8f815802 Antony Chazapis
        channel.basic_ack(delivery_tag=method_frame.delivery_tag)
89 8f815802 Antony Chazapis
    
90 8f815802 Antony Chazapis
    connection, channel, exchange = conn
91 8f815802 Antony Chazapis
    channel.basic_consume(handle_delivery, queue=queue)
92 8f815802 Antony Chazapis
93 8f815802 Antony Chazapis
def queue_start(conn):
94 8f815802 Antony Chazapis
    connection, channel, exchange = conn
95 8f815802 Antony Chazapis
    channel.start_consuming()
96 fa9cae7e Antony Chazapis
97 fa9cae7e Antony Chazapis
class Receipt(object):
98 813e42e5 Antony Chazapis
    def __init__(self, client, user, resource, value, details={}):
99 c878682f Antony Chazapis
        self.eventVersion = '1.0'
100 c878682f Antony Chazapis
        self.occurredMillis = int(time() * 1000)
101 c878682f Antony Chazapis
        self.receivedMillis = self.occurredMillis
102 c878682f Antony Chazapis
        self.clientID = client
103 c878682f Antony Chazapis
        self.userID = user
104 fa9cae7e Antony Chazapis
        self.resource = resource
105 fa9cae7e Antony Chazapis
        self.value = value
106 813e42e5 Antony Chazapis
        self.details = details
107 c878682f Antony Chazapis
        hash = sha1()
108 c878682f Antony Chazapis
        hash.update(json.dumps([client, user, resource, value, details, random()]))
109 c878682f Antony Chazapis
        self.id = hash.hexdigest()
110 fa9cae7e Antony Chazapis
    
111 fa9cae7e Antony Chazapis
    def format(self):
112 fa9cae7e Antony Chazapis
        return self.__dict__