root / logic / amqp_connection.py @ a4d2780c
History | View | Annotate | Download (3.5 kB)
1 |
# Copyright 2011 GRNET S.A. All rights reserved.
|
---|---|
2 |
#
|
3 |
# Redistribution and use in source and binary forms, with or without
|
4 |
# modification, are permitted provided that the following conditions
|
5 |
# are met:
|
6 |
#
|
7 |
# 1. Redistributions of source code must retain the above copyright
|
8 |
# notice, this list of conditions and the following disclaimer.
|
9 |
#
|
10 |
# 2. Redistributions in binary form must reproduce the above copyright
|
11 |
# notice, this list of conditions and the following disclaimer in the
|
12 |
# documentation and/or other materials provided with the distribution.
|
13 |
#
|
14 |
# THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
|
15 |
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
|
16 |
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
|
17 |
# ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
|
18 |
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
|
19 |
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
|
20 |
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
|
21 |
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
|
22 |
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
|
23 |
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
|
24 |
# SUCH DAMAGE.
|
25 |
#
|
26 |
# The views and conclusions contained in the software and documentation are
|
27 |
# those of the authors and should not be interpreted as representing official
|
28 |
# policies, either expressed or implied, of GRNET S.A.
|
29 |
|
30 |
import time |
31 |
import socket |
32 |
from amqplib import client_0_8 as amqp |
33 |
from django.conf import settings |
34 |
from synnefo.logic import log |
35 |
|
36 |
_conn = None
|
37 |
_chan = None
|
38 |
_logger = log.get_logger("amqplib")
|
39 |
|
40 |
def _connect(): |
41 |
global _conn, _chan
|
42 |
while _conn == None: |
43 |
try:
|
44 |
_conn = amqp.Connection(host=settings.RABBIT_HOST, |
45 |
userid=settings.RABBIT_USERNAME, |
46 |
password=settings.RABBIT_PASSWORD, |
47 |
virtual_host=settings.RABBIT_VHOST) |
48 |
except socket.error:
|
49 |
time.sleep(1)
|
50 |
_chan = _conn.channel() |
51 |
|
52 |
|
53 |
def send(payload, exchange, key): |
54 |
"""
|
55 |
Send payload to the specified exchange using the provided routing key.
|
56 |
|
57 |
This method will try reconnecting to the message server if a connection
|
58 |
error occurs when sending the message. All other errors are forwarded
|
59 |
to the client.
|
60 |
"""
|
61 |
global _conn, _chan
|
62 |
|
63 |
if payload is None: |
64 |
raise AMQPError("Message is empty") |
65 |
|
66 |
if exchange is None: |
67 |
raise AMQPError("Unknown exchange") |
68 |
|
69 |
if key is None: |
70 |
raise AMQPError("Unknown routing key") |
71 |
|
72 |
msg = amqp.Message(payload) |
73 |
msg.properties["delivery_mode"] = 2 # Persistent |
74 |
|
75 |
while True: |
76 |
try:
|
77 |
_chan.basic_publish(msg, |
78 |
exchange=exchange, |
79 |
routing_key=key) |
80 |
return
|
81 |
except socket.error as se: |
82 |
_logger.exception("Server went away, reconnecting...")
|
83 |
_connect() |
84 |
except Exception as e: |
85 |
if _conn is None: |
86 |
_connect() |
87 |
else:
|
88 |
_logger.exception('Caught unexpected exception (msg: %s)'%msg)
|
89 |
raise AMQPError("Error sending message to exchange %s with \ |
90 |
key %s.Payload: %s. Error was: %s",
|
91 |
(exchange, key, payload, e.message)) |
92 |
|
93 |
|
94 |
def __init__(): |
95 |
_connect() |
96 |
|
97 |
|
98 |
class AMQPError(Exception): |
99 |
def __init__(self, msg): |
100 |
self.message = msg
|