Statistics
| Branch: | Tag: | Revision:

root / logic / amqp_connection.py @ 583bfaa0

History | View | Annotate | Download (3.4 kB)

1
# Copyright 2011 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or without
4
# modification, are permitted provided that the following conditions
5
# are met:
6
#
7
#   1. Redistributions of source code must retain the above copyright
8
#      notice, this list of conditions and the following disclaimer.
9
#
10
#  2. Redistributions in binary form must reproduce the above copyright
11
#     notice, this list of conditions and the following disclaimer in the
12
#     documentation and/or other materials provided with the distribution.
13
#
14
# THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
15
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17
# ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
18
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24
# SUCH DAMAGE.
25
#
26
# The views and conclusions contained in the software and documentation are
27
# those of the authors and should not be interpreted as representing official
28
# policies, either expressed or implied, of GRNET S.A.
29

    
30
import time
31
import socket
32
from amqplib import client_0_8 as amqp
33
from django.conf import settings
34

    
35
_conn = None
36
_chan = None
37

    
38
def _connect():
39
    global _conn, _chan
40
    while _conn == None:
41
        try:
42
            _conn = amqp.Connection(host=settings.RABBIT_HOST,
43
                                   userid=settings.RABBIT_USERNAME,
44
                                   password=settings.RABBIT_PASSWORD,
45
                                   virtual_host=settings.RABBIT_VHOST)
46
        except socket.error:
47
            time.sleep(1)
48
    _chan = _conn.channel()
49

    
50

    
51
def send(payload, exchange, key):
52
    """
53
        Send payload to the specified exchange using the provided routing key.
54

55
        This method will try reconnecting to the message server if a connection
56
        error occurs when sending the message. All other errors are forwarded
57
        to the client.
58
    """
59
    global _conn, _chan
60

    
61
    if payload is None:
62
        raise AMQPError("Message is empty")
63

    
64
    if exchange is None:
65
        raise AMQPError("Unknown exchange")
66

    
67
    if key is None:
68
        raise AMQPError("Unknown routing key")
69

    
70
    msg = amqp.Message(payload)
71
    msg.properties["delivery_mode"] = 2  # Persistent
72

    
73
    while True:
74
        try:
75
           _chan.basic_publish(msg,
76
                               exchange=exchange,
77
                               routing_key=key)
78
           return
79
        except socket.error:
80
           #logger.exception("Server went away, reconnecting...")
81
           _connect()
82
        except Exception as e:
83
            if _conn is None:
84
               _connect()
85
            else:
86
                #self.logger.exception("Caught unexpected exception (msg: %s)", msg)
87
               raise AMQPError("Error sending message to exchange %s with key %s. Payload: %s. Error was: %s",
88
               (exchange, key, payload, e.message))
89

    
90
def __init__():
91
    _connect()
92

    
93

    
94
class AMQPError(Exception):
95
    def __init__(self, msg):
96
        self.message = msg