Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / amqp_connection.py @ 3c755209

History | View | Annotate | Download (3.8 kB)

1
# Copyright 2011 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or without
4
# modification, are permitted provided that the following conditions
5
# are met:
6
#
7
#   1. Redistributions of source code must retain the above copyright
8
#      notice, this list of conditions and the following disclaimer.
9
#
10
#  2. Redistributions in binary form must reproduce the above copyright
11
#     notice, this list of conditions and the following disclaimer in the
12
#     documentation and/or other materials provided with the distribution.
13
#
14
# THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
15
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17
# ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
18
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24
# SUCH DAMAGE.
25
#
26
# The views and conclusions contained in the software and documentation are
27
# those of the authors and should not be interpreted as representing official
28
# policies, either expressed or implied, of GRNET S.A.
29

    
30
import time
31
import socket
32

    
33
from logging import getLogger
34

    
35
from amqplib import client_0_8 as amqp
36
from django.conf import settings
37

    
38

    
39
log = getLogger('synnefo.logic')
40

    
41
_conn = None
42
_chan = None
43

    
44

    
45
def _connect():
46
    global _conn, _chan
47
    # Force the _conn object to re-initialize
48
    _conn = None
49
    retry = 0
50
    while _conn == None:
51
        try:
52
            _conn = amqp.Connection(host=settings.RABBIT_HOST,
53
                                   userid=settings.RABBIT_USERNAME,
54
                                   password=settings.RABBIT_PASSWORD,
55
                                   virtual_host=settings.RABBIT_VHOST)
56
        except socket.error:
57
            retry += 1
58
            if retry < 5 :
59
                log.exception("Cannot establish connection to AMQP. Retrying...")
60
                time.sleep(1)
61
            else:
62
                raise AMQPError("Queue error")
63
    _chan = _conn.channel()
64

    
65

    
66
def send(payload, exchange, key):
67
    """
68
        Send payload to the specified exchange using the provided routing key.
69

70
        This method will try reconnecting to the message server if a connection
71
        error occurs when sending the message. All other errors are forwarded
72
        to the client.
73
    """
74
    global _conn, _chan
75

    
76
    if payload is None:
77
        raise AMQPError("Message is empty")
78

    
79
    if exchange is None:
80
        raise AMQPError("Unknown exchange")
81

    
82
    if key is None:
83
        raise AMQPError("Unknown routing key")
84

    
85
    msg = amqp.Message(payload)
86
    msg.properties["delivery_mode"] = 2  # Persistent
87

    
88
    while True:
89
        try:
90
           _chan.basic_publish(msg,
91
                               exchange=exchange,
92
                               routing_key=key)
93
           return
94
        except socket.error as se:
95
           log.exception("Server went away, reconnecting...")
96
           _connect()
97
        except Exception as e:
98
            if _conn is None:
99
               _connect()
100
            else:
101
                log.exception('Caught unexpected exception (msg: %s)', msg)
102
                raise AMQPError("Error sending message to exchange %s with \
103
                                key %s.Payload: %s. Error was: %s",
104
                                (exchange, key, payload, e.message))
105

    
106

    
107
def __init__():
108
    _connect()
109

    
110
class AMQPError(Exception):
111
    pass