root / snf-cyclades-app / synnefo / logic / amqp_connection.py @ 3c755209
History | View | Annotate | Download (3.8 kB)
1 | 464a3579 | Georgios Gousios | # Copyright 2011 GRNET S.A. All rights reserved.
|
---|---|---|---|
2 | 464a3579 | Georgios Gousios | #
|
3 | 464a3579 | Georgios Gousios | # Redistribution and use in source and binary forms, with or without
|
4 | 464a3579 | Georgios Gousios | # modification, are permitted provided that the following conditions
|
5 | 464a3579 | Georgios Gousios | # are met:
|
6 | 464a3579 | Georgios Gousios | #
|
7 | 464a3579 | Georgios Gousios | # 1. Redistributions of source code must retain the above copyright
|
8 | 464a3579 | Georgios Gousios | # notice, this list of conditions and the following disclaimer.
|
9 | 464a3579 | Georgios Gousios | #
|
10 | 464a3579 | Georgios Gousios | # 2. Redistributions in binary form must reproduce the above copyright
|
11 | 464a3579 | Georgios Gousios | # notice, this list of conditions and the following disclaimer in the
|
12 | 464a3579 | Georgios Gousios | # documentation and/or other materials provided with the distribution.
|
13 | 464a3579 | Georgios Gousios | #
|
14 | 464a3579 | Georgios Gousios | # THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
|
15 | 464a3579 | Georgios Gousios | # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
|
16 | 464a3579 | Georgios Gousios | # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
|
17 | 464a3579 | Georgios Gousios | # ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
|
18 | 464a3579 | Georgios Gousios | # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
|
19 | 464a3579 | Georgios Gousios | # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
|
20 | 464a3579 | Georgios Gousios | # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
|
21 | 464a3579 | Georgios Gousios | # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
|
22 | 464a3579 | Georgios Gousios | # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
|
23 | 464a3579 | Georgios Gousios | # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
|
24 | 464a3579 | Georgios Gousios | # SUCH DAMAGE.
|
25 | 464a3579 | Georgios Gousios | #
|
26 | 464a3579 | Georgios Gousios | # The views and conclusions contained in the software and documentation are
|
27 | 464a3579 | Georgios Gousios | # those of the authors and should not be interpreted as representing official
|
28 | 464a3579 | Georgios Gousios | # policies, either expressed or implied, of GRNET S.A.
|
29 | 464a3579 | Georgios Gousios | |
30 | 4f6e36d9 | Georgios Gousios | import time |
31 | 75d20793 | Georgios Gousios | import socket |
32 | 2035039b | Giorgos Verigakis | |
33 | 2035039b | Giorgos Verigakis | from logging import getLogger |
34 | 2035039b | Giorgos Verigakis | |
35 | 4f6e36d9 | Georgios Gousios | from amqplib import client_0_8 as amqp |
36 | 4f6e36d9 | Georgios Gousios | from django.conf import settings |
37 | 9e98ba3c | Giorgos Verigakis | |
38 | 9e98ba3c | Giorgos Verigakis | |
39 | 9e98ba3c | Giorgos Verigakis | log = getLogger('synnefo.logic')
|
40 | 4f6e36d9 | Georgios Gousios | |
41 | 75d20793 | Georgios Gousios | _conn = None
|
42 | 75d20793 | Georgios Gousios | _chan = None
|
43 | 9e98ba3c | Giorgos Verigakis | |
44 | 4f6e36d9 | Georgios Gousios | |
45 | 75d20793 | Georgios Gousios | def _connect(): |
46 | 75d20793 | Georgios Gousios | global _conn, _chan
|
47 | 29053267 | Georgios Gousios | # Force the _conn object to re-initialize
|
48 | 29053267 | Georgios Gousios | _conn = None
|
49 | 05e404b8 | Georgios Gousios | retry = 0
|
50 | 75d20793 | Georgios Gousios | while _conn == None: |
51 | 75d20793 | Georgios Gousios | try:
|
52 | 75d20793 | Georgios Gousios | _conn = amqp.Connection(host=settings.RABBIT_HOST, |
53 | 75d20793 | Georgios Gousios | userid=settings.RABBIT_USERNAME, |
54 | 75d20793 | Georgios Gousios | password=settings.RABBIT_PASSWORD, |
55 | 75d20793 | Georgios Gousios | virtual_host=settings.RABBIT_VHOST) |
56 | 75d20793 | Georgios Gousios | except socket.error:
|
57 | 05e404b8 | Georgios Gousios | retry += 1
|
58 | 05e404b8 | Georgios Gousios | if retry < 5 : |
59 | 9e98ba3c | Giorgos Verigakis | log.exception("Cannot establish connection to AMQP. Retrying...")
|
60 | 05e404b8 | Georgios Gousios | time.sleep(1)
|
61 | 05e404b8 | Georgios Gousios | else:
|
62 | 05e404b8 | Georgios Gousios | raise AMQPError("Queue error") |
63 | 75d20793 | Georgios Gousios | _chan = _conn.channel() |
64 | 4f6e36d9 | Georgios Gousios | |
65 | 4f6e36d9 | Georgios Gousios | |
66 | 75d20793 | Georgios Gousios | def send(payload, exchange, key): |
67 | 5ad78098 | Georgios Gousios | """
|
68 | 464a3579 | Georgios Gousios | Send payload to the specified exchange using the provided routing key.
|
69 | 464a3579 | Georgios Gousios |
|
70 | 464a3579 | Georgios Gousios | This method will try reconnecting to the message server if a connection
|
71 | 464a3579 | Georgios Gousios | error occurs when sending the message. All other errors are forwarded
|
72 | 464a3579 | Georgios Gousios | to the client.
|
73 | 5ad78098 | Georgios Gousios | """
|
74 | 75d20793 | Georgios Gousios | global _conn, _chan
|
75 | 6afc46cb | Georgios Gousios | |
76 | 6afc46cb | Georgios Gousios | if payload is None: |
77 | 6afc46cb | Georgios Gousios | raise AMQPError("Message is empty") |
78 | 6afc46cb | Georgios Gousios | |
79 | 6afc46cb | Georgios Gousios | if exchange is None: |
80 | 6afc46cb | Georgios Gousios | raise AMQPError("Unknown exchange") |
81 | 6afc46cb | Georgios Gousios | |
82 | 6afc46cb | Georgios Gousios | if key is None: |
83 | 6afc46cb | Georgios Gousios | raise AMQPError("Unknown routing key") |
84 | 6afc46cb | Georgios Gousios | |
85 | 75d20793 | Georgios Gousios | msg = amqp.Message(payload) |
86 | 75d20793 | Georgios Gousios | msg.properties["delivery_mode"] = 2 # Persistent |
87 | 4f6e36d9 | Georgios Gousios | |
88 | 75d20793 | Georgios Gousios | while True: |
89 | 6afc46cb | Georgios Gousios | try:
|
90 | 75d20793 | Georgios Gousios | _chan.basic_publish(msg, |
91 | 75d20793 | Georgios Gousios | exchange=exchange, |
92 | 75d20793 | Georgios Gousios | routing_key=key) |
93 | 75d20793 | Georgios Gousios | return
|
94 | f2bdb9ab | Georgios Gousios | except socket.error as se: |
95 | 9e98ba3c | Giorgos Verigakis | log.exception("Server went away, reconnecting...")
|
96 | 75d20793 | Georgios Gousios | _connect() |
97 | 6afc46cb | Georgios Gousios | except Exception as e: |
98 | 6afc46cb | Georgios Gousios | if _conn is None: |
99 | 6afc46cb | Georgios Gousios | _connect() |
100 | 6afc46cb | Georgios Gousios | else:
|
101 | 9e98ba3c | Giorgos Verigakis | log.exception('Caught unexpected exception (msg: %s)', msg)
|
102 | f2bdb9ab | Georgios Gousios | raise AMQPError("Error sending message to exchange %s with \ |
103 | f2bdb9ab | Georgios Gousios | key %s.Payload: %s. Error was: %s",
|
104 | f2bdb9ab | Georgios Gousios | (exchange, key, payload, e.message)) |
105 | f2bdb9ab | Georgios Gousios | |
106 | 75d20793 | Georgios Gousios | |
107 | 75d20793 | Georgios Gousios | def __init__(): |
108 | 75d20793 | Georgios Gousios | _connect() |
109 | 6afc46cb | Georgios Gousios | |
110 | 6afc46cb | Georgios Gousios | class AMQPError(Exception): |
111 | 05e404b8 | Georgios Gousios | pass |