Statistics
| Branch: | Tag: | Revision:

root / logic / amqp_connection.py @ 404ccab2

History | View | Annotate | Download (3.8 kB)

1 464a3579 Georgios Gousios
# Copyright 2011 GRNET S.A. All rights reserved.
2 464a3579 Georgios Gousios
#
3 464a3579 Georgios Gousios
# Redistribution and use in source and binary forms, with or without
4 464a3579 Georgios Gousios
# modification, are permitted provided that the following conditions
5 464a3579 Georgios Gousios
# are met:
6 464a3579 Georgios Gousios
#
7 464a3579 Georgios Gousios
#   1. Redistributions of source code must retain the above copyright
8 464a3579 Georgios Gousios
#      notice, this list of conditions and the following disclaimer.
9 464a3579 Georgios Gousios
#
10 464a3579 Georgios Gousios
#  2. Redistributions in binary form must reproduce the above copyright
11 464a3579 Georgios Gousios
#     notice, this list of conditions and the following disclaimer in the
12 464a3579 Georgios Gousios
#     documentation and/or other materials provided with the distribution.
13 464a3579 Georgios Gousios
#
14 464a3579 Georgios Gousios
# THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
15 464a3579 Georgios Gousios
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16 464a3579 Georgios Gousios
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17 464a3579 Georgios Gousios
# ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
18 464a3579 Georgios Gousios
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19 464a3579 Georgios Gousios
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20 464a3579 Georgios Gousios
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21 464a3579 Georgios Gousios
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22 464a3579 Georgios Gousios
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23 464a3579 Georgios Gousios
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24 464a3579 Georgios Gousios
# SUCH DAMAGE.
25 464a3579 Georgios Gousios
#
26 464a3579 Georgios Gousios
# The views and conclusions contained in the software and documentation are
27 464a3579 Georgios Gousios
# those of the authors and should not be interpreted as representing official
28 464a3579 Georgios Gousios
# policies, either expressed or implied, of GRNET S.A.
29 464a3579 Georgios Gousios
30 4f6e36d9 Georgios Gousios
import time
31 75d20793 Georgios Gousios
import socket
32 4f6e36d9 Georgios Gousios
from amqplib import client_0_8 as amqp
33 4f6e36d9 Georgios Gousios
from django.conf import settings
34 f2bdb9ab Georgios Gousios
from synnefo.logic import log
35 4f6e36d9 Georgios Gousios
36 75d20793 Georgios Gousios
_conn = None
37 75d20793 Georgios Gousios
_chan = None
38 f2bdb9ab Georgios Gousios
_logger = log.get_logger("amqplib")
39 4f6e36d9 Georgios Gousios
40 75d20793 Georgios Gousios
def _connect():
41 75d20793 Georgios Gousios
    global _conn, _chan
42 29053267 Georgios Gousios
    # Force the _conn object to re-initialize
43 29053267 Georgios Gousios
    _conn = None
44 05e404b8 Georgios Gousios
    retry = 0
45 75d20793 Georgios Gousios
    while _conn == None:
46 75d20793 Georgios Gousios
        try:
47 75d20793 Georgios Gousios
            _conn = amqp.Connection(host=settings.RABBIT_HOST,
48 75d20793 Georgios Gousios
                                   userid=settings.RABBIT_USERNAME,
49 75d20793 Georgios Gousios
                                   password=settings.RABBIT_PASSWORD,
50 75d20793 Georgios Gousios
                                   virtual_host=settings.RABBIT_VHOST)
51 75d20793 Georgios Gousios
        except socket.error:
52 05e404b8 Georgios Gousios
            retry += 1
53 05e404b8 Georgios Gousios
            if retry < 5 :
54 05e404b8 Georgios Gousios
                _logger.exception("Cannot establish connection to AMQP. Retrying...")
55 05e404b8 Georgios Gousios
                time.sleep(1)
56 05e404b8 Georgios Gousios
            else:
57 05e404b8 Georgios Gousios
                raise AMQPError("Queue error")
58 75d20793 Georgios Gousios
    _chan = _conn.channel()
59 4f6e36d9 Georgios Gousios
60 4f6e36d9 Georgios Gousios
61 75d20793 Georgios Gousios
def send(payload, exchange, key):
62 5ad78098 Georgios Gousios
    """
63 464a3579 Georgios Gousios
        Send payload to the specified exchange using the provided routing key.
64 464a3579 Georgios Gousios

65 464a3579 Georgios Gousios
        This method will try reconnecting to the message server if a connection
66 464a3579 Georgios Gousios
        error occurs when sending the message. All other errors are forwarded
67 464a3579 Georgios Gousios
        to the client.
68 5ad78098 Georgios Gousios
    """
69 75d20793 Georgios Gousios
    global _conn, _chan
70 6afc46cb Georgios Gousios
71 6afc46cb Georgios Gousios
    if payload is None:
72 6afc46cb Georgios Gousios
        raise AMQPError("Message is empty")
73 6afc46cb Georgios Gousios
74 6afc46cb Georgios Gousios
    if exchange is None:
75 6afc46cb Georgios Gousios
        raise AMQPError("Unknown exchange")
76 6afc46cb Georgios Gousios
77 6afc46cb Georgios Gousios
    if key is None:
78 6afc46cb Georgios Gousios
        raise AMQPError("Unknown routing key")
79 6afc46cb Georgios Gousios
80 75d20793 Georgios Gousios
    msg = amqp.Message(payload)
81 75d20793 Georgios Gousios
    msg.properties["delivery_mode"] = 2  # Persistent
82 4f6e36d9 Georgios Gousios
83 75d20793 Georgios Gousios
    while True:
84 6afc46cb Georgios Gousios
        try:
85 75d20793 Georgios Gousios
           _chan.basic_publish(msg,
86 75d20793 Georgios Gousios
                               exchange=exchange,
87 75d20793 Georgios Gousios
                               routing_key=key)
88 75d20793 Georgios Gousios
           return
89 f2bdb9ab Georgios Gousios
        except socket.error as se:
90 f2bdb9ab Georgios Gousios
           _logger.exception("Server went away, reconnecting...")
91 75d20793 Georgios Gousios
           _connect()
92 6afc46cb Georgios Gousios
        except Exception as e:
93 6afc46cb Georgios Gousios
            if _conn is None:
94 6afc46cb Georgios Gousios
               _connect()
95 6afc46cb Georgios Gousios
            else:
96 190ac01d Georgios Gousios
                _logger.exception('Caught unexpected exception (msg: %s)' % msg)
97 f2bdb9ab Georgios Gousios
                raise AMQPError("Error sending message to exchange %s with \
98 f2bdb9ab Georgios Gousios
                                key %s.Payload: %s. Error was: %s",
99 f2bdb9ab Georgios Gousios
                                (exchange, key, payload, e.message))
100 f2bdb9ab Georgios Gousios
101 75d20793 Georgios Gousios
102 75d20793 Georgios Gousios
def __init__():
103 75d20793 Georgios Gousios
    _connect()
104 6afc46cb Georgios Gousios
105 6afc46cb Georgios Gousios
class AMQPError(Exception):
106 05e404b8 Georgios Gousios
    pass