Statistics
| Branch: | Tag: | Revision:

root / logic / amqp_connection.py @ 76a429fb

History | View | Annotate | Download (3.4 kB)

1 464a3579 Georgios Gousios
# Copyright 2011 GRNET S.A. All rights reserved.
2 464a3579 Georgios Gousios
#
3 464a3579 Georgios Gousios
# Redistribution and use in source and binary forms, with or without
4 464a3579 Georgios Gousios
# modification, are permitted provided that the following conditions
5 464a3579 Georgios Gousios
# are met:
6 464a3579 Georgios Gousios
#
7 464a3579 Georgios Gousios
#   1. Redistributions of source code must retain the above copyright
8 464a3579 Georgios Gousios
#      notice, this list of conditions and the following disclaimer.
9 464a3579 Georgios Gousios
#
10 464a3579 Georgios Gousios
#  2. Redistributions in binary form must reproduce the above copyright
11 464a3579 Georgios Gousios
#     notice, this list of conditions and the following disclaimer in the
12 464a3579 Georgios Gousios
#     documentation and/or other materials provided with the distribution.
13 464a3579 Georgios Gousios
#
14 464a3579 Georgios Gousios
# THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
15 464a3579 Georgios Gousios
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16 464a3579 Georgios Gousios
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17 464a3579 Georgios Gousios
# ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
18 464a3579 Georgios Gousios
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19 464a3579 Georgios Gousios
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20 464a3579 Georgios Gousios
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21 464a3579 Georgios Gousios
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22 464a3579 Georgios Gousios
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23 464a3579 Georgios Gousios
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24 464a3579 Georgios Gousios
# SUCH DAMAGE.
25 464a3579 Georgios Gousios
#
26 464a3579 Georgios Gousios
# The views and conclusions contained in the software and documentation are
27 464a3579 Georgios Gousios
# those of the authors and should not be interpreted as representing official
28 464a3579 Georgios Gousios
# policies, either expressed or implied, of GRNET S.A.
29 464a3579 Georgios Gousios
30 4f6e36d9 Georgios Gousios
import time
31 75d20793 Georgios Gousios
import socket
32 4f6e36d9 Georgios Gousios
from amqplib import client_0_8 as amqp
33 4f6e36d9 Georgios Gousios
from django.conf import settings
34 4f6e36d9 Georgios Gousios
35 75d20793 Georgios Gousios
_conn = None
36 75d20793 Georgios Gousios
_chan = None
37 4f6e36d9 Georgios Gousios
38 75d20793 Georgios Gousios
def _connect():
39 75d20793 Georgios Gousios
    global _conn, _chan
40 75d20793 Georgios Gousios
    while _conn == None:
41 75d20793 Georgios Gousios
        try:
42 75d20793 Georgios Gousios
            _conn = amqp.Connection(host=settings.RABBIT_HOST,
43 75d20793 Georgios Gousios
                                   userid=settings.RABBIT_USERNAME,
44 75d20793 Georgios Gousios
                                   password=settings.RABBIT_PASSWORD,
45 75d20793 Georgios Gousios
                                   virtual_host=settings.RABBIT_VHOST)
46 75d20793 Georgios Gousios
        except socket.error:
47 75d20793 Georgios Gousios
            time.sleep(1)
48 75d20793 Georgios Gousios
    _chan = _conn.channel()
49 4f6e36d9 Georgios Gousios
50 4f6e36d9 Georgios Gousios
51 75d20793 Georgios Gousios
def send(payload, exchange, key):
52 5ad78098 Georgios Gousios
    """
53 464a3579 Georgios Gousios
        Send payload to the specified exchange using the provided routing key.
54 464a3579 Georgios Gousios

55 464a3579 Georgios Gousios
        This method will try reconnecting to the message server if a connection
56 464a3579 Georgios Gousios
        error occurs when sending the message. All other errors are forwarded
57 464a3579 Georgios Gousios
        to the client.
58 5ad78098 Georgios Gousios
    """
59 75d20793 Georgios Gousios
    global _conn, _chan
60 6afc46cb Georgios Gousios
61 6afc46cb Georgios Gousios
    if payload is None:
62 6afc46cb Georgios Gousios
        raise AMQPError("Message is empty")
63 6afc46cb Georgios Gousios
64 6afc46cb Georgios Gousios
    if exchange is None:
65 6afc46cb Georgios Gousios
        raise AMQPError("Unknown exchange")
66 6afc46cb Georgios Gousios
67 6afc46cb Georgios Gousios
    if key is None:
68 6afc46cb Georgios Gousios
        raise AMQPError("Unknown routing key")
69 6afc46cb Georgios Gousios
70 75d20793 Georgios Gousios
    msg = amqp.Message(payload)
71 75d20793 Georgios Gousios
    msg.properties["delivery_mode"] = 2  # Persistent
72 4f6e36d9 Georgios Gousios
73 75d20793 Georgios Gousios
    while True:
74 6afc46cb Georgios Gousios
        try:
75 75d20793 Georgios Gousios
           _chan.basic_publish(msg,
76 75d20793 Georgios Gousios
                               exchange=exchange,
77 75d20793 Georgios Gousios
                               routing_key=key)
78 75d20793 Georgios Gousios
           return
79 6afc46cb Georgios Gousios
        except socket.error:
80 75d20793 Georgios Gousios
           #logger.exception("Server went away, reconnecting...")
81 75d20793 Georgios Gousios
           _connect()
82 6afc46cb Georgios Gousios
        except Exception as e:
83 6afc46cb Georgios Gousios
            if _conn is None:
84 6afc46cb Georgios Gousios
               _connect()
85 6afc46cb Georgios Gousios
            else:
86 6afc46cb Georgios Gousios
                #self.logger.exception("Caught unexpected exception (msg: %s)", msg)
87 6afc46cb Georgios Gousios
               raise AMQPError("Error sending message to exchange %s with key %s. Payload: %s. Error was: %s",
88 6afc46cb Georgios Gousios
               (exchange, key, payload, e.message))
89 75d20793 Georgios Gousios
90 75d20793 Georgios Gousios
def __init__():
91 75d20793 Georgios Gousios
    _connect()
92 6afc46cb Georgios Gousios
93 6afc46cb Georgios Gousios
94 6afc46cb Georgios Gousios
class AMQPError(Exception):
95 6afc46cb Georgios Gousios
    def __init__(self, msg):
96 6afc46cb Georgios Gousios
        self.message = msg