Statistics
| Branch: | Tag: | Revision:

root / snf-common / synnefo / lib / queue.py @ 5f6ad491

History | View | Annotate | Download (4.3 kB)

1 8acb1f97 Kostas Papadimitriou
# Copyright 2012 GRNET S.A. All rights reserved.
2 8acb1f97 Kostas Papadimitriou
#
3 8acb1f97 Kostas Papadimitriou
# Redistribution and use in source and binary forms, with or
4 8acb1f97 Kostas Papadimitriou
# without modification, are permitted provided that the following
5 8acb1f97 Kostas Papadimitriou
# conditions are met:
6 8acb1f97 Kostas Papadimitriou
#
7 8acb1f97 Kostas Papadimitriou
#   1. Redistributions of source code must retain the above
8 8acb1f97 Kostas Papadimitriou
#      copyright notice, this list of conditions and the following
9 8acb1f97 Kostas Papadimitriou
#      disclaimer.
10 8acb1f97 Kostas Papadimitriou
#
11 8acb1f97 Kostas Papadimitriou
#   2. Redistributions in binary form must reproduce the above
12 8acb1f97 Kostas Papadimitriou
#      copyright notice, this list of conditions and the following
13 8acb1f97 Kostas Papadimitriou
#      disclaimer in the documentation and/or other materials
14 8acb1f97 Kostas Papadimitriou
#      provided with the distribution.
15 8acb1f97 Kostas Papadimitriou
#
16 8acb1f97 Kostas Papadimitriou
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 8acb1f97 Kostas Papadimitriou
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 8acb1f97 Kostas Papadimitriou
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 8acb1f97 Kostas Papadimitriou
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 8acb1f97 Kostas Papadimitriou
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 8acb1f97 Kostas Papadimitriou
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 8acb1f97 Kostas Papadimitriou
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 8acb1f97 Kostas Papadimitriou
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 8acb1f97 Kostas Papadimitriou
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 8acb1f97 Kostas Papadimitriou
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 8acb1f97 Kostas Papadimitriou
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 8acb1f97 Kostas Papadimitriou
# POSSIBILITY OF SUCH DAMAGE.
28 8acb1f97 Kostas Papadimitriou
#
29 8acb1f97 Kostas Papadimitriou
# The views and conclusions contained in the software and
30 8acb1f97 Kostas Papadimitriou
# documentation are those of the authors and should not be
31 8acb1f97 Kostas Papadimitriou
# interpreted as representing official policies, either expressed
32 8acb1f97 Kostas Papadimitriou
# or implied, of GRNET S.A.
33 8acb1f97 Kostas Papadimitriou
34 8acb1f97 Kostas Papadimitriou
import pika
35 8acb1f97 Kostas Papadimitriou
import json
36 8acb1f97 Kostas Papadimitriou
import uuid
37 8acb1f97 Kostas Papadimitriou
38 8acb1f97 Kostas Papadimitriou
from urlparse import urlparse
39 8acb1f97 Kostas Papadimitriou
from hashlib import sha1
40 8acb1f97 Kostas Papadimitriou
from random import random
41 8acb1f97 Kostas Papadimitriou
from time import time
42 8acb1f97 Kostas Papadimitriou
43 8acb1f97 Kostas Papadimitriou
44 8acb1f97 Kostas Papadimitriou
def exchange_connect(exchange, vhost='/'):
45 8acb1f97 Kostas Papadimitriou
    """Format exchange as a URI: rabbitmq://user:pass@host:port/exchange"""
46 8acb1f97 Kostas Papadimitriou
    parts = urlparse(exchange)
47 8acb1f97 Kostas Papadimitriou
    if parts.scheme != 'rabbitmq':
48 8acb1f97 Kostas Papadimitriou
        return None
49 8acb1f97 Kostas Papadimitriou
    if len(parts.path) < 2 or not parts.path.startswith('/'):
50 8acb1f97 Kostas Papadimitriou
        return None
51 8acb1f97 Kostas Papadimitriou
    exchange = parts.path[1:]
52 8acb1f97 Kostas Papadimitriou
    connection = pika.BlockingConnection(pika.ConnectionParameters(
53 8acb1f97 Kostas Papadimitriou
                    host=parts.hostname, port=parts.port, virtual_host=vhost,
54 8acb1f97 Kostas Papadimitriou
                    credentials=pika.PlainCredentials(parts.username, parts.password)))
55 8acb1f97 Kostas Papadimitriou
    channel = connection.channel()
56 8acb1f97 Kostas Papadimitriou
    channel.exchange_declare(exchange=exchange, type='topic', durable=True)
57 8acb1f97 Kostas Papadimitriou
    return (connection, channel, exchange)
58 8acb1f97 Kostas Papadimitriou
59 8acb1f97 Kostas Papadimitriou
def exchange_close(conn):
60 8acb1f97 Kostas Papadimitriou
    connection, channel, exchange = conn
61 8acb1f97 Kostas Papadimitriou
    connection.close()
62 8acb1f97 Kostas Papadimitriou
63 8acb1f97 Kostas Papadimitriou
def exchange_send(conn, key, value):
64 8acb1f97 Kostas Papadimitriou
    """Messages are sent to exchanges at a key."""
65 8acb1f97 Kostas Papadimitriou
    connection, channel, exchange = conn
66 8acb1f97 Kostas Papadimitriou
    channel.basic_publish(exchange=exchange,
67 8acb1f97 Kostas Papadimitriou
                          routing_key=key,
68 8acb1f97 Kostas Papadimitriou
                          body=json.dumps(value))
69 8acb1f97 Kostas Papadimitriou
70 8acb1f97 Kostas Papadimitriou
    
71 8acb1f97 Kostas Papadimitriou
def exchange_route(conn, key, queue):
72 8acb1f97 Kostas Papadimitriou
    """Set up routing of keys to queue."""
73 8acb1f97 Kostas Papadimitriou
    connection, channel, exchange = conn
74 8acb1f97 Kostas Papadimitriou
    channel.queue_declare(queue=queue, durable=True,
75 8acb1f97 Kostas Papadimitriou
                          exclusive=False, auto_delete=False)
76 8acb1f97 Kostas Papadimitriou
    channel.queue_bind(exchange=exchange,
77 8acb1f97 Kostas Papadimitriou
                       queue=queue,
78 8acb1f97 Kostas Papadimitriou
                       routing_key=key)
79 8acb1f97 Kostas Papadimitriou
80 8acb1f97 Kostas Papadimitriou
def queue_callback(conn, queue, cb):
81 8acb1f97 Kostas Papadimitriou
    
82 8acb1f97 Kostas Papadimitriou
    def handle_delivery(channel, method_frame, header_frame, body):
83 8acb1f97 Kostas Papadimitriou
        #print 'Basic.Deliver %s delivery-tag %i: %s' % (header_frame.content_type,
84 8acb1f97 Kostas Papadimitriou
        #                                                method_frame.delivery_tag,
85 8acb1f97 Kostas Papadimitriou
        #                                                body)
86 8acb1f97 Kostas Papadimitriou
        if cb:
87 8acb1f97 Kostas Papadimitriou
            cb(json.loads(body))
88 8acb1f97 Kostas Papadimitriou
        channel.basic_ack(delivery_tag=method_frame.delivery_tag)
89 8acb1f97 Kostas Papadimitriou
    
90 8acb1f97 Kostas Papadimitriou
    connection, channel, exchange = conn
91 8acb1f97 Kostas Papadimitriou
    channel.basic_consume(handle_delivery, queue=queue)
92 8acb1f97 Kostas Papadimitriou
93 8acb1f97 Kostas Papadimitriou
def queue_start(conn):
94 8acb1f97 Kostas Papadimitriou
    connection, channel, exchange = conn
95 8acb1f97 Kostas Papadimitriou
    channel.start_consuming()
96 8acb1f97 Kostas Papadimitriou
97 8acb1f97 Kostas Papadimitriou
class Receipt(object):
98 21a69d65 Antony Chazapis
    def __init__(self, client, user, instance, resource, value, details={}):
99 8acb1f97 Kostas Papadimitriou
        self.eventVersion = '1.0'
100 8acb1f97 Kostas Papadimitriou
        self.occurredMillis = int(time() * 1000)
101 8acb1f97 Kostas Papadimitriou
        self.receivedMillis = self.occurredMillis
102 8acb1f97 Kostas Papadimitriou
        self.clientID = client
103 8acb1f97 Kostas Papadimitriou
        self.userID = user
104 21a69d65 Antony Chazapis
        self.instanceID = instance
105 8acb1f97 Kostas Papadimitriou
        self.resource = resource
106 8acb1f97 Kostas Papadimitriou
        self.value = value
107 8acb1f97 Kostas Papadimitriou
        self.details = details
108 8acb1f97 Kostas Papadimitriou
        hash = sha1()
109 8acb1f97 Kostas Papadimitriou
        hash.update(json.dumps([client, user, resource, value, details, random()]))
110 8acb1f97 Kostas Papadimitriou
        self.id = hash.hexdigest()
111 8acb1f97 Kostas Papadimitriou
    
112 8acb1f97 Kostas Papadimitriou
    def format(self):
113 8acb1f97 Kostas Papadimitriou
        return self.__dict__