Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / amqp_connection.py @ 3c755209

History | View | Annotate | Download (3.8 kB)

1 464a3579 Georgios Gousios
# Copyright 2011 GRNET S.A. All rights reserved.
2 464a3579 Georgios Gousios
#
3 464a3579 Georgios Gousios
# Redistribution and use in source and binary forms, with or without
4 464a3579 Georgios Gousios
# modification, are permitted provided that the following conditions
5 464a3579 Georgios Gousios
# are met:
6 464a3579 Georgios Gousios
#
7 464a3579 Georgios Gousios
#   1. Redistributions of source code must retain the above copyright
8 464a3579 Georgios Gousios
#      notice, this list of conditions and the following disclaimer.
9 464a3579 Georgios Gousios
#
10 464a3579 Georgios Gousios
#  2. Redistributions in binary form must reproduce the above copyright
11 464a3579 Georgios Gousios
#     notice, this list of conditions and the following disclaimer in the
12 464a3579 Georgios Gousios
#     documentation and/or other materials provided with the distribution.
13 464a3579 Georgios Gousios
#
14 464a3579 Georgios Gousios
# THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
15 464a3579 Georgios Gousios
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16 464a3579 Georgios Gousios
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17 464a3579 Georgios Gousios
# ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
18 464a3579 Georgios Gousios
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19 464a3579 Georgios Gousios
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20 464a3579 Georgios Gousios
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21 464a3579 Georgios Gousios
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22 464a3579 Georgios Gousios
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23 464a3579 Georgios Gousios
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24 464a3579 Georgios Gousios
# SUCH DAMAGE.
25 464a3579 Georgios Gousios
#
26 464a3579 Georgios Gousios
# The views and conclusions contained in the software and documentation are
27 464a3579 Georgios Gousios
# those of the authors and should not be interpreted as representing official
28 464a3579 Georgios Gousios
# policies, either expressed or implied, of GRNET S.A.
29 464a3579 Georgios Gousios
30 4f6e36d9 Georgios Gousios
import time
31 75d20793 Georgios Gousios
import socket
32 2035039b Giorgos Verigakis
33 2035039b Giorgos Verigakis
from logging import getLogger
34 2035039b Giorgos Verigakis
35 4f6e36d9 Georgios Gousios
from amqplib import client_0_8 as amqp
36 4f6e36d9 Georgios Gousios
from django.conf import settings
37 9e98ba3c Giorgos Verigakis
38 9e98ba3c Giorgos Verigakis
39 9e98ba3c Giorgos Verigakis
log = getLogger('synnefo.logic')
40 4f6e36d9 Georgios Gousios
41 75d20793 Georgios Gousios
_conn = None
42 75d20793 Georgios Gousios
_chan = None
43 9e98ba3c Giorgos Verigakis
44 4f6e36d9 Georgios Gousios
45 75d20793 Georgios Gousios
def _connect():
46 75d20793 Georgios Gousios
    global _conn, _chan
47 29053267 Georgios Gousios
    # Force the _conn object to re-initialize
48 29053267 Georgios Gousios
    _conn = None
49 05e404b8 Georgios Gousios
    retry = 0
50 75d20793 Georgios Gousios
    while _conn == None:
51 75d20793 Georgios Gousios
        try:
52 75d20793 Georgios Gousios
            _conn = amqp.Connection(host=settings.RABBIT_HOST,
53 75d20793 Georgios Gousios
                                   userid=settings.RABBIT_USERNAME,
54 75d20793 Georgios Gousios
                                   password=settings.RABBIT_PASSWORD,
55 75d20793 Georgios Gousios
                                   virtual_host=settings.RABBIT_VHOST)
56 75d20793 Georgios Gousios
        except socket.error:
57 05e404b8 Georgios Gousios
            retry += 1
58 05e404b8 Georgios Gousios
            if retry < 5 :
59 9e98ba3c Giorgos Verigakis
                log.exception("Cannot establish connection to AMQP. Retrying...")
60 05e404b8 Georgios Gousios
                time.sleep(1)
61 05e404b8 Georgios Gousios
            else:
62 05e404b8 Georgios Gousios
                raise AMQPError("Queue error")
63 75d20793 Georgios Gousios
    _chan = _conn.channel()
64 4f6e36d9 Georgios Gousios
65 4f6e36d9 Georgios Gousios
66 75d20793 Georgios Gousios
def send(payload, exchange, key):
67 5ad78098 Georgios Gousios
    """
68 464a3579 Georgios Gousios
        Send payload to the specified exchange using the provided routing key.
69 464a3579 Georgios Gousios

70 464a3579 Georgios Gousios
        This method will try reconnecting to the message server if a connection
71 464a3579 Georgios Gousios
        error occurs when sending the message. All other errors are forwarded
72 464a3579 Georgios Gousios
        to the client.
73 5ad78098 Georgios Gousios
    """
74 75d20793 Georgios Gousios
    global _conn, _chan
75 6afc46cb Georgios Gousios
76 6afc46cb Georgios Gousios
    if payload is None:
77 6afc46cb Georgios Gousios
        raise AMQPError("Message is empty")
78 6afc46cb Georgios Gousios
79 6afc46cb Georgios Gousios
    if exchange is None:
80 6afc46cb Georgios Gousios
        raise AMQPError("Unknown exchange")
81 6afc46cb Georgios Gousios
82 6afc46cb Georgios Gousios
    if key is None:
83 6afc46cb Georgios Gousios
        raise AMQPError("Unknown routing key")
84 6afc46cb Georgios Gousios
85 75d20793 Georgios Gousios
    msg = amqp.Message(payload)
86 75d20793 Georgios Gousios
    msg.properties["delivery_mode"] = 2  # Persistent
87 4f6e36d9 Georgios Gousios
88 75d20793 Georgios Gousios
    while True:
89 6afc46cb Georgios Gousios
        try:
90 75d20793 Georgios Gousios
           _chan.basic_publish(msg,
91 75d20793 Georgios Gousios
                               exchange=exchange,
92 75d20793 Georgios Gousios
                               routing_key=key)
93 75d20793 Georgios Gousios
           return
94 f2bdb9ab Georgios Gousios
        except socket.error as se:
95 9e98ba3c Giorgos Verigakis
           log.exception("Server went away, reconnecting...")
96 75d20793 Georgios Gousios
           _connect()
97 6afc46cb Georgios Gousios
        except Exception as e:
98 6afc46cb Georgios Gousios
            if _conn is None:
99 6afc46cb Georgios Gousios
               _connect()
100 6afc46cb Georgios Gousios
            else:
101 9e98ba3c Giorgos Verigakis
                log.exception('Caught unexpected exception (msg: %s)', msg)
102 f2bdb9ab Georgios Gousios
                raise AMQPError("Error sending message to exchange %s with \
103 f2bdb9ab Georgios Gousios
                                key %s.Payload: %s. Error was: %s",
104 f2bdb9ab Georgios Gousios
                                (exchange, key, payload, e.message))
105 f2bdb9ab Georgios Gousios
106 75d20793 Georgios Gousios
107 75d20793 Georgios Gousios
def __init__():
108 75d20793 Georgios Gousios
    _connect()
109 6afc46cb Georgios Gousios
110 6afc46cb Georgios Gousios
class AMQPError(Exception):
111 05e404b8 Georgios Gousios
    pass