Statistics
| Branch: | Tag: | Revision:

root / logic / amqp_connection.py @ 9e98ba3c

History | View | Annotate | Download (3.8 kB)

1
# Copyright 2011 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or without
4
# modification, are permitted provided that the following conditions
5
# are met:
6
#
7
#   1. Redistributions of source code must retain the above copyright
8
#      notice, this list of conditions and the following disclaimer.
9
#
10
#  2. Redistributions in binary form must reproduce the above copyright
11
#     notice, this list of conditions and the following disclaimer in the
12
#     documentation and/or other materials provided with the distribution.
13
#
14
# THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
15
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17
# ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
18
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24
# SUCH DAMAGE.
25
#
26
# The views and conclusions contained in the software and documentation are
27
# those of the authors and should not be interpreted as representing official
28
# policies, either expressed or implied, of GRNET S.A.
29

    
30
import time
31
import socket
32
from amqplib import client_0_8 as amqp
33
from django.conf import settings
34

    
35
from synnefo.util.log import getLogger
36

    
37

    
38
log = getLogger('synnefo.logic')
39

    
40
_conn = None
41
_chan = None
42

    
43

    
44
def _connect():
45
    global _conn, _chan
46
    # Force the _conn object to re-initialize
47
    _conn = None
48
    retry = 0
49
    while _conn == None:
50
        try:
51
            _conn = amqp.Connection(host=settings.RABBIT_HOST,
52
                                   userid=settings.RABBIT_USERNAME,
53
                                   password=settings.RABBIT_PASSWORD,
54
                                   virtual_host=settings.RABBIT_VHOST)
55
        except socket.error:
56
            retry += 1
57
            if retry < 5 :
58
                log.exception("Cannot establish connection to AMQP. Retrying...")
59
                time.sleep(1)
60
            else:
61
                raise AMQPError("Queue error")
62
    _chan = _conn.channel()
63

    
64

    
65
def send(payload, exchange, key):
66
    """
67
        Send payload to the specified exchange using the provided routing key.
68

69
        This method will try reconnecting to the message server if a connection
70
        error occurs when sending the message. All other errors are forwarded
71
        to the client.
72
    """
73
    global _conn, _chan
74

    
75
    if payload is None:
76
        raise AMQPError("Message is empty")
77

    
78
    if exchange is None:
79
        raise AMQPError("Unknown exchange")
80

    
81
    if key is None:
82
        raise AMQPError("Unknown routing key")
83

    
84
    msg = amqp.Message(payload)
85
    msg.properties["delivery_mode"] = 2  # Persistent
86

    
87
    while True:
88
        try:
89
           _chan.basic_publish(msg,
90
                               exchange=exchange,
91
                               routing_key=key)
92
           return
93
        except socket.error as se:
94
           log.exception("Server went away, reconnecting...")
95
           _connect()
96
        except Exception as e:
97
            if _conn is None:
98
               _connect()
99
            else:
100
                log.exception('Caught unexpected exception (msg: %s)', msg)
101
                raise AMQPError("Error sending message to exchange %s with \
102
                                key %s.Payload: %s. Error was: %s",
103
                                (exchange, key, payload, e.message))
104

    
105

    
106
def __init__():
107
    _connect()
108

    
109
class AMQPError(Exception):
110
    pass