Statistics
| Branch: | Tag: | Revision:

root / logic / amqp_connection.py @ a4d2780c

History | View | Annotate | Download (3.5 kB)

1
# Copyright 2011 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or without
4
# modification, are permitted provided that the following conditions
5
# are met:
6
#
7
#   1. Redistributions of source code must retain the above copyright
8
#      notice, this list of conditions and the following disclaimer.
9
#
10
#  2. Redistributions in binary form must reproduce the above copyright
11
#     notice, this list of conditions and the following disclaimer in the
12
#     documentation and/or other materials provided with the distribution.
13
#
14
# THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
15
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17
# ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
18
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24
# SUCH DAMAGE.
25
#
26
# The views and conclusions contained in the software and documentation are
27
# those of the authors and should not be interpreted as representing official
28
# policies, either expressed or implied, of GRNET S.A.
29

    
30
import time
31
import socket
32
from amqplib import client_0_8 as amqp
33
from django.conf import settings
34
from synnefo.logic import log
35

    
36
_conn = None
37
_chan = None
38
_logger = log.get_logger("amqplib")
39

    
40
def _connect():
41
    global _conn, _chan
42
    while _conn == None:
43
        try:
44
            _conn = amqp.Connection(host=settings.RABBIT_HOST,
45
                                   userid=settings.RABBIT_USERNAME,
46
                                   password=settings.RABBIT_PASSWORD,
47
                                   virtual_host=settings.RABBIT_VHOST)
48
        except socket.error:
49
            time.sleep(1)
50
    _chan = _conn.channel()
51

    
52

    
53
def send(payload, exchange, key):
54
    """
55
        Send payload to the specified exchange using the provided routing key.
56

57
        This method will try reconnecting to the message server if a connection
58
        error occurs when sending the message. All other errors are forwarded
59
        to the client.
60
    """
61
    global _conn, _chan
62

    
63
    if payload is None:
64
        raise AMQPError("Message is empty")
65

    
66
    if exchange is None:
67
        raise AMQPError("Unknown exchange")
68

    
69
    if key is None:
70
        raise AMQPError("Unknown routing key")
71

    
72
    msg = amqp.Message(payload)
73
    msg.properties["delivery_mode"] = 2  # Persistent
74

    
75
    while True:
76
        try:
77
           _chan.basic_publish(msg,
78
                               exchange=exchange,
79
                               routing_key=key)
80
           return
81
        except socket.error as se:
82
           _logger.exception("Server went away, reconnecting...")
83
           _connect()
84
        except Exception as e:
85
            if _conn is None:
86
               _connect()
87
            else:
88
                _logger.exception('Caught unexpected exception (msg: %s)'%msg)
89
                raise AMQPError("Error sending message to exchange %s with \
90
                                key %s.Payload: %s. Error was: %s",
91
                                (exchange, key, payload, e.message))
92

    
93

    
94
def __init__():
95
    _connect()
96

    
97

    
98
class AMQPError(Exception):
99
    def __init__(self, msg):
100
        self.message = msg