Statistics
| Branch: | Tag: | Revision:

root / snf-common / synnefo / lib / amqp_haigha.py @ 5f6ad491

History | View | Annotate | Download (10 kB)

1 db400d82 Christos Stavrakakis
# Copyright 2012 GRNET S.A. All rights reserved.
2 db400d82 Christos Stavrakakis
#
3 db400d82 Christos Stavrakakis
# Redistribution and use in source and binary forms, with or
4 db400d82 Christos Stavrakakis
# without modification, are permitted provided that the following
5 db400d82 Christos Stavrakakis
# conditions are met:
6 db400d82 Christos Stavrakakis
#
7 db400d82 Christos Stavrakakis
#   1. Redistributions of source code must retain the above
8 db400d82 Christos Stavrakakis
#      copyright notice, this list of conditions and the following
9 db400d82 Christos Stavrakakis
#      disclaimer.
10 db400d82 Christos Stavrakakis
#
11 db400d82 Christos Stavrakakis
#   2. Redistributions in binary form must reproduce the above
12 db400d82 Christos Stavrakakis
#      copyright notice, this list of conditions and the following
13 db400d82 Christos Stavrakakis
#      disclaimer in the documentation and/or other materials
14 db400d82 Christos Stavrakakis
#      provided with the distribution.
15 db400d82 Christos Stavrakakis
#
16 db400d82 Christos Stavrakakis
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 db400d82 Christos Stavrakakis
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 db400d82 Christos Stavrakakis
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 db400d82 Christos Stavrakakis
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 db400d82 Christos Stavrakakis
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 db400d82 Christos Stavrakakis
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 db400d82 Christos Stavrakakis
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 db400d82 Christos Stavrakakis
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 db400d82 Christos Stavrakakis
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 db400d82 Christos Stavrakakis
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 db400d82 Christos Stavrakakis
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 db400d82 Christos Stavrakakis
# POSSIBILITY OF SUCH DAMAGE.
28 db400d82 Christos Stavrakakis
#
29 db400d82 Christos Stavrakakis
# The views and conclusions contained in the software and
30 db400d82 Christos Stavrakakis
# documentation are those of the authors and should not be
31 db400d82 Christos Stavrakakis
# interpreted as representing official policies, either expressed
32 db400d82 Christos Stavrakakis
# or implied, of GRNET S.A.
33 db400d82 Christos Stavrakakis
34 db400d82 Christos Stavrakakis
from haigha.connections import RabbitConnection
35 db400d82 Christos Stavrakakis
from haigha.message import Message
36 db400d82 Christos Stavrakakis
from haigha import exceptions
37 db400d82 Christos Stavrakakis
from random import shuffle
38 db400d82 Christos Stavrakakis
from time import sleep
39 db400d82 Christos Stavrakakis
import logging
40 db400d82 Christos Stavrakakis
import socket
41 db400d82 Christos Stavrakakis
from synnefo import settings
42 db400d82 Christos Stavrakakis
from ordereddict import OrderedDict
43 db400d82 Christos Stavrakakis
import gevent
44 db400d82 Christos Stavrakakis
from gevent import monkey
45 db400d82 Christos Stavrakakis
from functools import wraps
46 db400d82 Christos Stavrakakis
47 db400d82 Christos Stavrakakis
48 db400d82 Christos Stavrakakis
logging.basicConfig(level=logging.INFO, format="[%(levelname)s %(asctime)s] %(message)s" )
49 db400d82 Christos Stavrakakis
logger = logging.getLogger('haigha')
50 db400d82 Christos Stavrakakis
51 db400d82 Christos Stavrakakis
sock_opts = {
52 db400d82 Christos Stavrakakis
  (socket.IPPROTO_TCP, socket.TCP_NODELAY): 1,
53 db400d82 Christos Stavrakakis
}
54 db400d82 Christos Stavrakakis
55 db400d82 Christos Stavrakakis
56 db400d82 Christos Stavrakakis
def reconnect_decorator(func):
57 db400d82 Christos Stavrakakis
    """
58 db400d82 Christos Stavrakakis
    Decorator for persistent connection with one or more AMQP brokers.
59 db400d82 Christos Stavrakakis

60 db400d82 Christos Stavrakakis
    """
61 db400d82 Christos Stavrakakis
    @wraps(func)
62 db400d82 Christos Stavrakakis
    def wrapper(self, *args, **kwargs):
63 db400d82 Christos Stavrakakis
        try:
64 db400d82 Christos Stavrakakis
            func(self, *args, **kwargs)
65 db400d82 Christos Stavrakakis
        except (socket.error, exceptions.ConnectionError) as e:
66 db400d82 Christos Stavrakakis
            logger.error('Connection Closed while in %s: %s', func.__name__, e)
67 db400d82 Christos Stavrakakis
            self.connect()
68 db400d82 Christos Stavrakakis
69 db400d82 Christos Stavrakakis
    return wrapper
70 db400d82 Christos Stavrakakis
71 db400d82 Christos Stavrakakis
72 db400d82 Christos Stavrakakis
class AMQPHaighaClient():
73 db400d82 Christos Stavrakakis
    def __init__(self, hosts=settings.AMQP_HOSTS, max_retries=30,
74 db400d82 Christos Stavrakakis
                 confirms=True, confirm_buffer=200):
75 db400d82 Christos Stavrakakis
        self.hosts = hosts
76 db400d82 Christos Stavrakakis
        shuffle(self.hosts)
77 db400d82 Christos Stavrakakis
78 db400d82 Christos Stavrakakis
        self.max_retries = max_retries
79 db400d82 Christos Stavrakakis
        self.confirms = confirms
80 db400d82 Christos Stavrakakis
        self.confirm_buffer = confirm_buffer
81 db400d82 Christos Stavrakakis
82 db400d82 Christos Stavrakakis
        self.connection = None
83 db400d82 Christos Stavrakakis
        self.channel = None
84 db400d82 Christos Stavrakakis
        self.consumers = {}
85 db400d82 Christos Stavrakakis
        self.unacked = OrderedDict()
86 db400d82 Christos Stavrakakis
87 db400d82 Christos Stavrakakis
    def connect(self, retries=0):
88 db400d82 Christos Stavrakakis
        if retries > self.max_retries:
89 db400d82 Christos Stavrakakis
            logger.error("Aborting after %s retries", retries - 1)
90 db400d82 Christos Stavrakakis
            raise AMQPConnectionError('Aborting after %d connection failures.'\
91 db400d82 Christos Stavrakakis
                                      % (retries - 1))
92 db400d82 Christos Stavrakakis
            return
93 db400d82 Christos Stavrakakis
94 db400d82 Christos Stavrakakis
        # Pick up a host
95 db400d82 Christos Stavrakakis
        host = self.hosts.pop()
96 db400d82 Christos Stavrakakis
        self.hosts.insert(0, host)
97 db400d82 Christos Stavrakakis
98 db400d82 Christos Stavrakakis
        #Patch gevent
99 db400d82 Christos Stavrakakis
        monkey.patch_all()
100 db400d82 Christos Stavrakakis
101 db400d82 Christos Stavrakakis
        try:
102 db400d82 Christos Stavrakakis
            self.connection = \
103 db400d82 Christos Stavrakakis
                 RabbitConnection(logger=logger, debug=True,
104 db400d82 Christos Stavrakakis
                      user='rabbit', password='r@bb1t',
105 db400d82 Christos Stavrakakis
                      vhost='/', host=host,
106 db400d82 Christos Stavrakakis
                      heartbeat=None,
107 db400d82 Christos Stavrakakis
                      sock_opts=sock_opts,
108 db400d82 Christos Stavrakakis
                      transport='gevent')
109 db400d82 Christos Stavrakakis
        except socket.error as e:
110 db400d82 Christos Stavrakakis
            logger.error('Cannot connect to host %s: %s', host, e)
111 db400d82 Christos Stavrakakis
            if retries > 2 * len(self.hosts):
112 db400d82 Christos Stavrakakis
                sleep(1)
113 db400d82 Christos Stavrakakis
            return self.connect(retries + 1)
114 db400d82 Christos Stavrakakis
115 db400d82 Christos Stavrakakis
        logger.info('Successfully connected to host: %s', host)
116 db400d82 Christos Stavrakakis
117 db400d82 Christos Stavrakakis
        logger.info('Creating channel')
118 db400d82 Christos Stavrakakis
        self.channel = self.connection.channel()
119 db400d82 Christos Stavrakakis
120 db400d82 Christos Stavrakakis
        if self.confirms:
121 db400d82 Christos Stavrakakis
            self._confirm_select()
122 db400d82 Christos Stavrakakis
123 db400d82 Christos Stavrakakis
        if self.unacked:
124 db400d82 Christos Stavrakakis
            self._resend_unacked_messages()
125 db400d82 Christos Stavrakakis
126 db400d82 Christos Stavrakakis
        if self.consumers:
127 db400d82 Christos Stavrakakis
            for queue, callback in self.consumers.items():
128 db400d82 Christos Stavrakakis
                self.basic_consume(queue, callback)
129 db400d82 Christos Stavrakakis
130 db400d82 Christos Stavrakakis
    def exchange_declare(self, exchange, type='direct'):
131 db400d82 Christos Stavrakakis
        """Declare an exchange
132 db400d82 Christos Stavrakakis
        @type exchange_name: string
133 db400d82 Christos Stavrakakis
        @param exchange_name: name of the exchange
134 db400d82 Christos Stavrakakis
        @type exchange_type: string
135 db400d82 Christos Stavrakakis
        @param exhange_type: one of 'direct', 'topic', 'fanout'
136 db400d82 Christos Stavrakakis

137 db400d82 Christos Stavrakakis
        """
138 db400d82 Christos Stavrakakis
139 db400d82 Christos Stavrakakis
        logger.info('Declaring %s exchange: %s', type, exchange)
140 db400d82 Christos Stavrakakis
        self.channel.exchange.declare(exchange, type,
141 db400d82 Christos Stavrakakis
                                      auto_delete=False, durable=True)
142 db400d82 Christos Stavrakakis
143 db400d82 Christos Stavrakakis
    def queue_declare(self, queue, exclusive=False, mirrored=True,
144 db400d82 Christos Stavrakakis
                      mirrored_nodes='all'):
145 db400d82 Christos Stavrakakis
        """Declare a queue
146 db400d82 Christos Stavrakakis

147 db400d82 Christos Stavrakakis
        @type queue: string
148 db400d82 Christos Stavrakakis
        @param queue: name of the queue
149 db400d82 Christos Stavrakakis
        @param mirrored: whether the queue will be mirrored to other brokers
150 db400d82 Christos Stavrakakis
        @param mirrored_nodes: the policy for the mirrored queue.
151 db400d82 Christos Stavrakakis
            Available policies:
152 db400d82 Christos Stavrakakis
                - 'all': The queue is mirrored to all nodes and the
153 db400d82 Christos Stavrakakis
                  master node is the one to which the client is
154 db400d82 Christos Stavrakakis
                  connected
155 db400d82 Christos Stavrakakis
                - a list of nodes. The queue will be mirrored only to
156 db400d82 Christos Stavrakakis
                  the specified nodes, and the master will be the
157 db400d82 Christos Stavrakakis
                  first node in the list. Node names must be provided
158 db400d82 Christos Stavrakakis
                  and not host IP. example: [node1@rabbit,node2@rabbit]
159 db400d82 Christos Stavrakakis

160 db400d82 Christos Stavrakakis
        """
161 db400d82 Christos Stavrakakis
162 db400d82 Christos Stavrakakis
        logger.info('Declaring queue: %s', queue)
163 db400d82 Christos Stavrakakis
        if mirrored:
164 db400d82 Christos Stavrakakis
            if mirrored_nodes == 'all':
165 db400d82 Christos Stavrakakis
                arguments = {'x-ha-policy': 'all'}
166 db400d82 Christos Stavrakakis
            elif isinstance(mirrored_nodes, list):
167 db400d82 Christos Stavrakakis
                arguments = {'x-ha-policy': 'nodes',
168 db400d82 Christos Stavrakakis
                             'x-ha-policy-params': mirrored_nodes}
169 db400d82 Christos Stavrakakis
            else:
170 db400d82 Christos Stavrakakis
                raise AttributeError
171 db400d82 Christos Stavrakakis
        else:
172 db400d82 Christos Stavrakakis
            arguments = {}
173 db400d82 Christos Stavrakakis
174 db400d82 Christos Stavrakakis
        self.channel.queue.declare(queue, durable=True, exclusive=exclusive,
175 db400d82 Christos Stavrakakis
                                   auto_delete=False, arguments=arguments)
176 db400d82 Christos Stavrakakis
177 db400d82 Christos Stavrakakis
    def queue_bind(self, queue, exchange, routing_key):
178 db400d82 Christos Stavrakakis
        logger.info('Binding queue %s to exchange %s with key %s', queue,
179 db400d82 Christos Stavrakakis
                    exchange, routing_key)
180 db400d82 Christos Stavrakakis
        self.channel.queue.bind(queue=queue, exchange=exchange,
181 db400d82 Christos Stavrakakis
                                routing_key=routing_key)
182 db400d82 Christos Stavrakakis
183 db400d82 Christos Stavrakakis
    def _confirm_select(self):
184 db400d82 Christos Stavrakakis
        logger.info('Setting channel to confirm mode')
185 db400d82 Christos Stavrakakis
        self.channel.confirm.select()
186 db400d82 Christos Stavrakakis
        self.channel.basic.set_ack_listener(self._ack_received)
187 db400d82 Christos Stavrakakis
        self.channel.basic.set_nack_listener(self._nack_received)
188 db400d82 Christos Stavrakakis
189 db400d82 Christos Stavrakakis
    @reconnect_decorator
190 db400d82 Christos Stavrakakis
    def basic_publish(self, exchange, routing_key, body):
191 db400d82 Christos Stavrakakis
        msg = Message(body, delivery_mode=2)
192 db400d82 Christos Stavrakakis
        mid = self.channel.basic.publish(msg, exchange, routing_key)
193 db400d82 Christos Stavrakakis
        if self.confirms:
194 db400d82 Christos Stavrakakis
            self.unacked[mid] = (exchange, routing_key, body)
195 db400d82 Christos Stavrakakis
            if len(self.unacked) > self.confirm_buffer:
196 db400d82 Christos Stavrakakis
                self.get_confirms()
197 db400d82 Christos Stavrakakis
198 db400d82 Christos Stavrakakis
        logger.debug('Published message %s with id %s', body, mid)
199 db400d82 Christos Stavrakakis
200 db400d82 Christos Stavrakakis
    @reconnect_decorator
201 db400d82 Christos Stavrakakis
    def get_confirms(self):
202 db400d82 Christos Stavrakakis
        self.connection.read_frames()
203 db400d82 Christos Stavrakakis
204 db400d82 Christos Stavrakakis
    @reconnect_decorator
205 db400d82 Christos Stavrakakis
    def _resend_unacked_messages(self):
206 db400d82 Christos Stavrakakis
        msgs = self.unacked.values()
207 db400d82 Christos Stavrakakis
        self.unacked.clear()
208 db400d82 Christos Stavrakakis
        for exchange, routing_key, body in msgs:
209 db400d82 Christos Stavrakakis
            logger.debug('Resending message %s', body)
210 db400d82 Christos Stavrakakis
            self.basic_publish(exchange, routing_key, body)
211 db400d82 Christos Stavrakakis
212 db400d82 Christos Stavrakakis
    @reconnect_decorator
213 db400d82 Christos Stavrakakis
    def _ack_received(self, mid):
214 db400d82 Christos Stavrakakis
        print mid
215 db400d82 Christos Stavrakakis
        logger.debug('Received ACK for message with id %s', mid)
216 db400d82 Christos Stavrakakis
        self.unacked.pop(mid)
217 db400d82 Christos Stavrakakis
218 db400d82 Christos Stavrakakis
    @reconnect_decorator
219 db400d82 Christos Stavrakakis
    def _nack_received(self, mid):
220 db400d82 Christos Stavrakakis
        logger.error('Received NACK for message with id %s. Retrying.', mid)
221 db400d82 Christos Stavrakakis
        (exchange, routing_key, body) = self.unacked[mid]
222 db400d82 Christos Stavrakakis
        self.basic_publish(exchange, routing_key, body)
223 db400d82 Christos Stavrakakis
224 db400d82 Christos Stavrakakis
    def basic_consume(self, queue, callback):
225 db400d82 Christos Stavrakakis
        """Consume from a queue.
226 db400d82 Christos Stavrakakis

227 db400d82 Christos Stavrakakis
        @type queue: string or list of strings
228 db400d82 Christos Stavrakakis
        @param queue: the name or list of names from the queues to consume
229 db400d82 Christos Stavrakakis
        @type callback: function
230 db400d82 Christos Stavrakakis
        @param callback: the callback function to run when a message arrives
231 db400d82 Christos Stavrakakis

232 db400d82 Christos Stavrakakis
        """
233 db400d82 Christos Stavrakakis
234 db400d82 Christos Stavrakakis
        self.consumers[queue] = callback
235 db400d82 Christos Stavrakakis
        self.channel.basic.consume(queue, consumer=callback, no_ack=False)
236 db400d82 Christos Stavrakakis
237 db400d82 Christos Stavrakakis
    @reconnect_decorator
238 db400d82 Christos Stavrakakis
    def basic_wait(self):
239 db400d82 Christos Stavrakakis
        """Wait for messages from the queues declared by basic_consume.
240 db400d82 Christos Stavrakakis

241 db400d82 Christos Stavrakakis
        This function will block until a message arrives from the queues that
242 db400d82 Christos Stavrakakis
        have been declared with basic_consume. If the optional arguments
243 db400d82 Christos Stavrakakis
        'promise' is given, only messages for this promise will be delivered.
244 db400d82 Christos Stavrakakis

245 db400d82 Christos Stavrakakis
        """
246 db400d82 Christos Stavrakakis
247 db400d82 Christos Stavrakakis
        self.connection.read_frames()
248 db400d82 Christos Stavrakakis
        gevent.sleep(0)
249 db400d82 Christos Stavrakakis
250 db400d82 Christos Stavrakakis
    @reconnect_decorator
251 db400d82 Christos Stavrakakis
    def basic_get(self, queue):
252 db400d82 Christos Stavrakakis
        self.channel.basic.get(queue, no_ack=False)
253 db400d82 Christos Stavrakakis
254 db400d82 Christos Stavrakakis
    @reconnect_decorator
255 db400d82 Christos Stavrakakis
    def basic_ack(self, message):
256 db400d82 Christos Stavrakakis
        delivery_tag = message.delivery_info['delivery_tag']
257 db400d82 Christos Stavrakakis
        self.channel.basic.ack(delivery_tag)
258 db400d82 Christos Stavrakakis
259 db400d82 Christos Stavrakakis
    @reconnect_decorator
260 db400d82 Christos Stavrakakis
    def basic_nack(self, message):
261 db400d82 Christos Stavrakakis
        delivery_tag = message.delivery_info['delivery_tag']
262 db400d82 Christos Stavrakakis
        self.channel.basic.ack(delivery_tag)
263 db400d82 Christos Stavrakakis
264 db400d82 Christos Stavrakakis
    def close(self):
265 db400d82 Christos Stavrakakis
        try:
266 db400d82 Christos Stavrakakis
            if self.confirms:
267 db400d82 Christos Stavrakakis
                while self.unacked:
268 db400d82 Christos Stavrakakis
                    print self.unacked
269 db400d82 Christos Stavrakakis
                    self.get_confirms()
270 db400d82 Christos Stavrakakis
            self.channel.close()
271 db400d82 Christos Stavrakakis
            close_info = self.channel.close_info
272 db400d82 Christos Stavrakakis
            logger.info('Successfully closed channel. Info: %s', close_info)
273 db400d82 Christos Stavrakakis
            self.connection.close()
274 db400d82 Christos Stavrakakis
        except socket.error as e:
275 db400d82 Christos Stavrakakis
            logger.error('Connection closed while closing connection:%s',
276 db400d82 Christos Stavrakakis
                          e)
277 db400d82 Christos Stavrakakis
278 db400d82 Christos Stavrakakis
    def queue_delete(self, queue, if_unused=True, if_empty=True):
279 db400d82 Christos Stavrakakis
        self.channel.queue.delete(queue, if_unused, if_empty)
280 db400d82 Christos Stavrakakis
281 db400d82 Christos Stavrakakis
    def exchange_delete(self, exchange, if_unused=True):
282 db400d82 Christos Stavrakakis
        self.channel.exchange.delete(exchange, if_unused)
283 db400d82 Christos Stavrakakis
284 db400d82 Christos Stavrakakis
    def basic_class(self):
285 db400d82 Christos Stavrakakis
        pass
286 db400d82 Christos Stavrakakis
287 db400d82 Christos Stavrakakis
288 db400d82 Christos Stavrakakis
class  AMQPConnectionError():
289 db400d82 Christos Stavrakakis
    pass