root / snf-common / synnefo / lib / amqp_haigha.py @ 5f6ad491
History | View | Annotate | Download (10 kB)
1 | db400d82 | Christos Stavrakakis | # Copyright 2012 GRNET S.A. All rights reserved.
|
---|---|---|---|
2 | db400d82 | Christos Stavrakakis | #
|
3 | db400d82 | Christos Stavrakakis | # Redistribution and use in source and binary forms, with or
|
4 | db400d82 | Christos Stavrakakis | # without modification, are permitted provided that the following
|
5 | db400d82 | Christos Stavrakakis | # conditions are met:
|
6 | db400d82 | Christos Stavrakakis | #
|
7 | db400d82 | Christos Stavrakakis | # 1. Redistributions of source code must retain the above
|
8 | db400d82 | Christos Stavrakakis | # copyright notice, this list of conditions and the following
|
9 | db400d82 | Christos Stavrakakis | # disclaimer.
|
10 | db400d82 | Christos Stavrakakis | #
|
11 | db400d82 | Christos Stavrakakis | # 2. Redistributions in binary form must reproduce the above
|
12 | db400d82 | Christos Stavrakakis | # copyright notice, this list of conditions and the following
|
13 | db400d82 | Christos Stavrakakis | # disclaimer in the documentation and/or other materials
|
14 | db400d82 | Christos Stavrakakis | # provided with the distribution.
|
15 | db400d82 | Christos Stavrakakis | #
|
16 | db400d82 | Christos Stavrakakis | # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
|
17 | db400d82 | Christos Stavrakakis | # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
|
18 | db400d82 | Christos Stavrakakis | # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
|
19 | db400d82 | Christos Stavrakakis | # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
|
20 | db400d82 | Christos Stavrakakis | # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
|
21 | db400d82 | Christos Stavrakakis | # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
|
22 | db400d82 | Christos Stavrakakis | # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
|
23 | db400d82 | Christos Stavrakakis | # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
|
24 | db400d82 | Christos Stavrakakis | # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
|
25 | db400d82 | Christos Stavrakakis | # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
|
26 | db400d82 | Christos Stavrakakis | # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
|
27 | db400d82 | Christos Stavrakakis | # POSSIBILITY OF SUCH DAMAGE.
|
28 | db400d82 | Christos Stavrakakis | #
|
29 | db400d82 | Christos Stavrakakis | # The views and conclusions contained in the software and
|
30 | db400d82 | Christos Stavrakakis | # documentation are those of the authors and should not be
|
31 | db400d82 | Christos Stavrakakis | # interpreted as representing official policies, either expressed
|
32 | db400d82 | Christos Stavrakakis | # or implied, of GRNET S.A.
|
33 | db400d82 | Christos Stavrakakis | |
34 | db400d82 | Christos Stavrakakis | from haigha.connections import RabbitConnection |
35 | db400d82 | Christos Stavrakakis | from haigha.message import Message |
36 | db400d82 | Christos Stavrakakis | from haigha import exceptions |
37 | db400d82 | Christos Stavrakakis | from random import shuffle |
38 | db400d82 | Christos Stavrakakis | from time import sleep |
39 | db400d82 | Christos Stavrakakis | import logging |
40 | db400d82 | Christos Stavrakakis | import socket |
41 | db400d82 | Christos Stavrakakis | from synnefo import settings |
42 | db400d82 | Christos Stavrakakis | from ordereddict import OrderedDict |
43 | db400d82 | Christos Stavrakakis | import gevent |
44 | db400d82 | Christos Stavrakakis | from gevent import monkey |
45 | db400d82 | Christos Stavrakakis | from functools import wraps |
46 | db400d82 | Christos Stavrakakis | |
47 | db400d82 | Christos Stavrakakis | |
48 | db400d82 | Christos Stavrakakis | logging.basicConfig(level=logging.INFO, format="[%(levelname)s %(asctime)s] %(message)s" )
|
49 | db400d82 | Christos Stavrakakis | logger = logging.getLogger('haigha')
|
50 | db400d82 | Christos Stavrakakis | |
51 | db400d82 | Christos Stavrakakis | sock_opts = { |
52 | db400d82 | Christos Stavrakakis | (socket.IPPROTO_TCP, socket.TCP_NODELAY): 1,
|
53 | db400d82 | Christos Stavrakakis | } |
54 | db400d82 | Christos Stavrakakis | |
55 | db400d82 | Christos Stavrakakis | |
56 | db400d82 | Christos Stavrakakis | def reconnect_decorator(func): |
57 | db400d82 | Christos Stavrakakis | """
|
58 | db400d82 | Christos Stavrakakis | Decorator for persistent connection with one or more AMQP brokers.
|
59 | db400d82 | Christos Stavrakakis |
|
60 | db400d82 | Christos Stavrakakis | """
|
61 | db400d82 | Christos Stavrakakis | @wraps(func)
|
62 | db400d82 | Christos Stavrakakis | def wrapper(self, *args, **kwargs): |
63 | db400d82 | Christos Stavrakakis | try:
|
64 | db400d82 | Christos Stavrakakis | func(self, *args, **kwargs)
|
65 | db400d82 | Christos Stavrakakis | except (socket.error, exceptions.ConnectionError) as e: |
66 | db400d82 | Christos Stavrakakis | logger.error('Connection Closed while in %s: %s', func.__name__, e)
|
67 | db400d82 | Christos Stavrakakis | self.connect()
|
68 | db400d82 | Christos Stavrakakis | |
69 | db400d82 | Christos Stavrakakis | return wrapper
|
70 | db400d82 | Christos Stavrakakis | |
71 | db400d82 | Christos Stavrakakis | |
72 | db400d82 | Christos Stavrakakis | class AMQPHaighaClient(): |
73 | db400d82 | Christos Stavrakakis | def __init__(self, hosts=settings.AMQP_HOSTS, max_retries=30, |
74 | db400d82 | Christos Stavrakakis | confirms=True, confirm_buffer=200): |
75 | db400d82 | Christos Stavrakakis | self.hosts = hosts
|
76 | db400d82 | Christos Stavrakakis | shuffle(self.hosts)
|
77 | db400d82 | Christos Stavrakakis | |
78 | db400d82 | Christos Stavrakakis | self.max_retries = max_retries
|
79 | db400d82 | Christos Stavrakakis | self.confirms = confirms
|
80 | db400d82 | Christos Stavrakakis | self.confirm_buffer = confirm_buffer
|
81 | db400d82 | Christos Stavrakakis | |
82 | db400d82 | Christos Stavrakakis | self.connection = None |
83 | db400d82 | Christos Stavrakakis | self.channel = None |
84 | db400d82 | Christos Stavrakakis | self.consumers = {}
|
85 | db400d82 | Christos Stavrakakis | self.unacked = OrderedDict()
|
86 | db400d82 | Christos Stavrakakis | |
87 | db400d82 | Christos Stavrakakis | def connect(self, retries=0): |
88 | db400d82 | Christos Stavrakakis | if retries > self.max_retries: |
89 | db400d82 | Christos Stavrakakis | logger.error("Aborting after %s retries", retries - 1) |
90 | db400d82 | Christos Stavrakakis | raise AMQPConnectionError('Aborting after %d connection failures.'\ |
91 | db400d82 | Christos Stavrakakis | % (retries - 1))
|
92 | db400d82 | Christos Stavrakakis | return
|
93 | db400d82 | Christos Stavrakakis | |
94 | db400d82 | Christos Stavrakakis | # Pick up a host
|
95 | db400d82 | Christos Stavrakakis | host = self.hosts.pop()
|
96 | db400d82 | Christos Stavrakakis | self.hosts.insert(0, host) |
97 | db400d82 | Christos Stavrakakis | |
98 | db400d82 | Christos Stavrakakis | #Patch gevent
|
99 | db400d82 | Christos Stavrakakis | monkey.patch_all() |
100 | db400d82 | Christos Stavrakakis | |
101 | db400d82 | Christos Stavrakakis | try:
|
102 | db400d82 | Christos Stavrakakis | self.connection = \
|
103 | db400d82 | Christos Stavrakakis | RabbitConnection(logger=logger, debug=True,
|
104 | db400d82 | Christos Stavrakakis | user='rabbit', password='r@bb1t', |
105 | db400d82 | Christos Stavrakakis | vhost='/', host=host,
|
106 | db400d82 | Christos Stavrakakis | heartbeat=None,
|
107 | db400d82 | Christos Stavrakakis | sock_opts=sock_opts, |
108 | db400d82 | Christos Stavrakakis | transport='gevent')
|
109 | db400d82 | Christos Stavrakakis | except socket.error as e: |
110 | db400d82 | Christos Stavrakakis | logger.error('Cannot connect to host %s: %s', host, e)
|
111 | db400d82 | Christos Stavrakakis | if retries > 2 * len(self.hosts): |
112 | db400d82 | Christos Stavrakakis | sleep(1)
|
113 | db400d82 | Christos Stavrakakis | return self.connect(retries + 1) |
114 | db400d82 | Christos Stavrakakis | |
115 | db400d82 | Christos Stavrakakis | logger.info('Successfully connected to host: %s', host)
|
116 | db400d82 | Christos Stavrakakis | |
117 | db400d82 | Christos Stavrakakis | logger.info('Creating channel')
|
118 | db400d82 | Christos Stavrakakis | self.channel = self.connection.channel() |
119 | db400d82 | Christos Stavrakakis | |
120 | db400d82 | Christos Stavrakakis | if self.confirms: |
121 | db400d82 | Christos Stavrakakis | self._confirm_select()
|
122 | db400d82 | Christos Stavrakakis | |
123 | db400d82 | Christos Stavrakakis | if self.unacked: |
124 | db400d82 | Christos Stavrakakis | self._resend_unacked_messages()
|
125 | db400d82 | Christos Stavrakakis | |
126 | db400d82 | Christos Stavrakakis | if self.consumers: |
127 | db400d82 | Christos Stavrakakis | for queue, callback in self.consumers.items(): |
128 | db400d82 | Christos Stavrakakis | self.basic_consume(queue, callback)
|
129 | db400d82 | Christos Stavrakakis | |
130 | db400d82 | Christos Stavrakakis | def exchange_declare(self, exchange, type='direct'): |
131 | db400d82 | Christos Stavrakakis | """Declare an exchange
|
132 | db400d82 | Christos Stavrakakis | @type exchange_name: string
|
133 | db400d82 | Christos Stavrakakis | @param exchange_name: name of the exchange
|
134 | db400d82 | Christos Stavrakakis | @type exchange_type: string
|
135 | db400d82 | Christos Stavrakakis | @param exhange_type: one of 'direct', 'topic', 'fanout'
|
136 | db400d82 | Christos Stavrakakis |
|
137 | db400d82 | Christos Stavrakakis | """
|
138 | db400d82 | Christos Stavrakakis | |
139 | db400d82 | Christos Stavrakakis | logger.info('Declaring %s exchange: %s', type, exchange) |
140 | db400d82 | Christos Stavrakakis | self.channel.exchange.declare(exchange, type, |
141 | db400d82 | Christos Stavrakakis | auto_delete=False, durable=True) |
142 | db400d82 | Christos Stavrakakis | |
143 | db400d82 | Christos Stavrakakis | def queue_declare(self, queue, exclusive=False, mirrored=True, |
144 | db400d82 | Christos Stavrakakis | mirrored_nodes='all'):
|
145 | db400d82 | Christos Stavrakakis | """Declare a queue
|
146 | db400d82 | Christos Stavrakakis |
|
147 | db400d82 | Christos Stavrakakis | @type queue: string
|
148 | db400d82 | Christos Stavrakakis | @param queue: name of the queue
|
149 | db400d82 | Christos Stavrakakis | @param mirrored: whether the queue will be mirrored to other brokers
|
150 | db400d82 | Christos Stavrakakis | @param mirrored_nodes: the policy for the mirrored queue.
|
151 | db400d82 | Christos Stavrakakis | Available policies:
|
152 | db400d82 | Christos Stavrakakis | - 'all': The queue is mirrored to all nodes and the
|
153 | db400d82 | Christos Stavrakakis | master node is the one to which the client is
|
154 | db400d82 | Christos Stavrakakis | connected
|
155 | db400d82 | Christos Stavrakakis | - a list of nodes. The queue will be mirrored only to
|
156 | db400d82 | Christos Stavrakakis | the specified nodes, and the master will be the
|
157 | db400d82 | Christos Stavrakakis | first node in the list. Node names must be provided
|
158 | db400d82 | Christos Stavrakakis | and not host IP. example: [node1@rabbit,node2@rabbit]
|
159 | db400d82 | Christos Stavrakakis |
|
160 | db400d82 | Christos Stavrakakis | """
|
161 | db400d82 | Christos Stavrakakis | |
162 | db400d82 | Christos Stavrakakis | logger.info('Declaring queue: %s', queue)
|
163 | db400d82 | Christos Stavrakakis | if mirrored:
|
164 | db400d82 | Christos Stavrakakis | if mirrored_nodes == 'all': |
165 | db400d82 | Christos Stavrakakis | arguments = {'x-ha-policy': 'all'} |
166 | db400d82 | Christos Stavrakakis | elif isinstance(mirrored_nodes, list): |
167 | db400d82 | Christos Stavrakakis | arguments = {'x-ha-policy': 'nodes', |
168 | db400d82 | Christos Stavrakakis | 'x-ha-policy-params': mirrored_nodes}
|
169 | db400d82 | Christos Stavrakakis | else:
|
170 | db400d82 | Christos Stavrakakis | raise AttributeError |
171 | db400d82 | Christos Stavrakakis | else:
|
172 | db400d82 | Christos Stavrakakis | arguments = {} |
173 | db400d82 | Christos Stavrakakis | |
174 | db400d82 | Christos Stavrakakis | self.channel.queue.declare(queue, durable=True, exclusive=exclusive, |
175 | db400d82 | Christos Stavrakakis | auto_delete=False, arguments=arguments)
|
176 | db400d82 | Christos Stavrakakis | |
177 | db400d82 | Christos Stavrakakis | def queue_bind(self, queue, exchange, routing_key): |
178 | db400d82 | Christos Stavrakakis | logger.info('Binding queue %s to exchange %s with key %s', queue,
|
179 | db400d82 | Christos Stavrakakis | exchange, routing_key) |
180 | db400d82 | Christos Stavrakakis | self.channel.queue.bind(queue=queue, exchange=exchange,
|
181 | db400d82 | Christos Stavrakakis | routing_key=routing_key) |
182 | db400d82 | Christos Stavrakakis | |
183 | db400d82 | Christos Stavrakakis | def _confirm_select(self): |
184 | db400d82 | Christos Stavrakakis | logger.info('Setting channel to confirm mode')
|
185 | db400d82 | Christos Stavrakakis | self.channel.confirm.select()
|
186 | db400d82 | Christos Stavrakakis | self.channel.basic.set_ack_listener(self._ack_received) |
187 | db400d82 | Christos Stavrakakis | self.channel.basic.set_nack_listener(self._nack_received) |
188 | db400d82 | Christos Stavrakakis | |
189 | db400d82 | Christos Stavrakakis | @reconnect_decorator
|
190 | db400d82 | Christos Stavrakakis | def basic_publish(self, exchange, routing_key, body): |
191 | db400d82 | Christos Stavrakakis | msg = Message(body, delivery_mode=2)
|
192 | db400d82 | Christos Stavrakakis | mid = self.channel.basic.publish(msg, exchange, routing_key)
|
193 | db400d82 | Christos Stavrakakis | if self.confirms: |
194 | db400d82 | Christos Stavrakakis | self.unacked[mid] = (exchange, routing_key, body)
|
195 | db400d82 | Christos Stavrakakis | if len(self.unacked) > self.confirm_buffer: |
196 | db400d82 | Christos Stavrakakis | self.get_confirms()
|
197 | db400d82 | Christos Stavrakakis | |
198 | db400d82 | Christos Stavrakakis | logger.debug('Published message %s with id %s', body, mid)
|
199 | db400d82 | Christos Stavrakakis | |
200 | db400d82 | Christos Stavrakakis | @reconnect_decorator
|
201 | db400d82 | Christos Stavrakakis | def get_confirms(self): |
202 | db400d82 | Christos Stavrakakis | self.connection.read_frames()
|
203 | db400d82 | Christos Stavrakakis | |
204 | db400d82 | Christos Stavrakakis | @reconnect_decorator
|
205 | db400d82 | Christos Stavrakakis | def _resend_unacked_messages(self): |
206 | db400d82 | Christos Stavrakakis | msgs = self.unacked.values()
|
207 | db400d82 | Christos Stavrakakis | self.unacked.clear()
|
208 | db400d82 | Christos Stavrakakis | for exchange, routing_key, body in msgs: |
209 | db400d82 | Christos Stavrakakis | logger.debug('Resending message %s', body)
|
210 | db400d82 | Christos Stavrakakis | self.basic_publish(exchange, routing_key, body)
|
211 | db400d82 | Christos Stavrakakis | |
212 | db400d82 | Christos Stavrakakis | @reconnect_decorator
|
213 | db400d82 | Christos Stavrakakis | def _ack_received(self, mid): |
214 | db400d82 | Christos Stavrakakis | print mid
|
215 | db400d82 | Christos Stavrakakis | logger.debug('Received ACK for message with id %s', mid)
|
216 | db400d82 | Christos Stavrakakis | self.unacked.pop(mid)
|
217 | db400d82 | Christos Stavrakakis | |
218 | db400d82 | Christos Stavrakakis | @reconnect_decorator
|
219 | db400d82 | Christos Stavrakakis | def _nack_received(self, mid): |
220 | db400d82 | Christos Stavrakakis | logger.error('Received NACK for message with id %s. Retrying.', mid)
|
221 | db400d82 | Christos Stavrakakis | (exchange, routing_key, body) = self.unacked[mid]
|
222 | db400d82 | Christos Stavrakakis | self.basic_publish(exchange, routing_key, body)
|
223 | db400d82 | Christos Stavrakakis | |
224 | db400d82 | Christos Stavrakakis | def basic_consume(self, queue, callback): |
225 | db400d82 | Christos Stavrakakis | """Consume from a queue.
|
226 | db400d82 | Christos Stavrakakis |
|
227 | db400d82 | Christos Stavrakakis | @type queue: string or list of strings
|
228 | db400d82 | Christos Stavrakakis | @param queue: the name or list of names from the queues to consume
|
229 | db400d82 | Christos Stavrakakis | @type callback: function
|
230 | db400d82 | Christos Stavrakakis | @param callback: the callback function to run when a message arrives
|
231 | db400d82 | Christos Stavrakakis |
|
232 | db400d82 | Christos Stavrakakis | """
|
233 | db400d82 | Christos Stavrakakis | |
234 | db400d82 | Christos Stavrakakis | self.consumers[queue] = callback
|
235 | db400d82 | Christos Stavrakakis | self.channel.basic.consume(queue, consumer=callback, no_ack=False) |
236 | db400d82 | Christos Stavrakakis | |
237 | db400d82 | Christos Stavrakakis | @reconnect_decorator
|
238 | db400d82 | Christos Stavrakakis | def basic_wait(self): |
239 | db400d82 | Christos Stavrakakis | """Wait for messages from the queues declared by basic_consume.
|
240 | db400d82 | Christos Stavrakakis |
|
241 | db400d82 | Christos Stavrakakis | This function will block until a message arrives from the queues that
|
242 | db400d82 | Christos Stavrakakis | have been declared with basic_consume. If the optional arguments
|
243 | db400d82 | Christos Stavrakakis | 'promise' is given, only messages for this promise will be delivered.
|
244 | db400d82 | Christos Stavrakakis |
|
245 | db400d82 | Christos Stavrakakis | """
|
246 | db400d82 | Christos Stavrakakis | |
247 | db400d82 | Christos Stavrakakis | self.connection.read_frames()
|
248 | db400d82 | Christos Stavrakakis | gevent.sleep(0)
|
249 | db400d82 | Christos Stavrakakis | |
250 | db400d82 | Christos Stavrakakis | @reconnect_decorator
|
251 | db400d82 | Christos Stavrakakis | def basic_get(self, queue): |
252 | db400d82 | Christos Stavrakakis | self.channel.basic.get(queue, no_ack=False) |
253 | db400d82 | Christos Stavrakakis | |
254 | db400d82 | Christos Stavrakakis | @reconnect_decorator
|
255 | db400d82 | Christos Stavrakakis | def basic_ack(self, message): |
256 | db400d82 | Christos Stavrakakis | delivery_tag = message.delivery_info['delivery_tag']
|
257 | db400d82 | Christos Stavrakakis | self.channel.basic.ack(delivery_tag)
|
258 | db400d82 | Christos Stavrakakis | |
259 | db400d82 | Christos Stavrakakis | @reconnect_decorator
|
260 | db400d82 | Christos Stavrakakis | def basic_nack(self, message): |
261 | db400d82 | Christos Stavrakakis | delivery_tag = message.delivery_info['delivery_tag']
|
262 | db400d82 | Christos Stavrakakis | self.channel.basic.ack(delivery_tag)
|
263 | db400d82 | Christos Stavrakakis | |
264 | db400d82 | Christos Stavrakakis | def close(self): |
265 | db400d82 | Christos Stavrakakis | try:
|
266 | db400d82 | Christos Stavrakakis | if self.confirms: |
267 | db400d82 | Christos Stavrakakis | while self.unacked: |
268 | db400d82 | Christos Stavrakakis | print self.unacked |
269 | db400d82 | Christos Stavrakakis | self.get_confirms()
|
270 | db400d82 | Christos Stavrakakis | self.channel.close()
|
271 | db400d82 | Christos Stavrakakis | close_info = self.channel.close_info
|
272 | db400d82 | Christos Stavrakakis | logger.info('Successfully closed channel. Info: %s', close_info)
|
273 | db400d82 | Christos Stavrakakis | self.connection.close()
|
274 | db400d82 | Christos Stavrakakis | except socket.error as e: |
275 | db400d82 | Christos Stavrakakis | logger.error('Connection closed while closing connection:%s',
|
276 | db400d82 | Christos Stavrakakis | e) |
277 | db400d82 | Christos Stavrakakis | |
278 | db400d82 | Christos Stavrakakis | def queue_delete(self, queue, if_unused=True, if_empty=True): |
279 | db400d82 | Christos Stavrakakis | self.channel.queue.delete(queue, if_unused, if_empty)
|
280 | db400d82 | Christos Stavrakakis | |
281 | db400d82 | Christos Stavrakakis | def exchange_delete(self, exchange, if_unused=True): |
282 | db400d82 | Christos Stavrakakis | self.channel.exchange.delete(exchange, if_unused)
|
283 | db400d82 | Christos Stavrakakis | |
284 | db400d82 | Christos Stavrakakis | def basic_class(self): |
285 | db400d82 | Christos Stavrakakis | pass
|
286 | db400d82 | Christos Stavrakakis | |
287 | db400d82 | Christos Stavrakakis | |
288 | db400d82 | Christos Stavrakakis | class AMQPConnectionError(): |
289 | db400d82 | Christos Stavrakakis | pass |