root / snf-common / synnefo / lib / amqp / amqp_puka.py @ 2e26966f
History | View | Annotate | Download (14.9 kB)
1 | db400d82 | Christos Stavrakakis | # Copyright 2012 GRNET S.A. All rights reserved.
|
---|---|---|---|
2 | db400d82 | Christos Stavrakakis | #
|
3 | db400d82 | Christos Stavrakakis | # Redistribution and use in source and binary forms, with or
|
4 | db400d82 | Christos Stavrakakis | # without modification, are permitted provided that the following
|
5 | db400d82 | Christos Stavrakakis | # conditions are met:
|
6 | db400d82 | Christos Stavrakakis | #
|
7 | db400d82 | Christos Stavrakakis | # 1. Redistributions of source code must retain the above
|
8 | db400d82 | Christos Stavrakakis | # copyright notice, this list of conditions and the following
|
9 | db400d82 | Christos Stavrakakis | # disclaimer.
|
10 | db400d82 | Christos Stavrakakis | #
|
11 | db400d82 | Christos Stavrakakis | # 2. Redistributions in binary form must reproduce the above
|
12 | db400d82 | Christos Stavrakakis | # copyright notice, this list of conditions and the following
|
13 | db400d82 | Christos Stavrakakis | # disclaimer in the documentation and/or other materials
|
14 | db400d82 | Christos Stavrakakis | # provided with the distribution.
|
15 | db400d82 | Christos Stavrakakis | #
|
16 | db400d82 | Christos Stavrakakis | # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
|
17 | db400d82 | Christos Stavrakakis | # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
|
18 | db400d82 | Christos Stavrakakis | # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
|
19 | db400d82 | Christos Stavrakakis | # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
|
20 | db400d82 | Christos Stavrakakis | # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
|
21 | db400d82 | Christos Stavrakakis | # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
|
22 | db400d82 | Christos Stavrakakis | # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
|
23 | db400d82 | Christos Stavrakakis | # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
|
24 | db400d82 | Christos Stavrakakis | # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
|
25 | db400d82 | Christos Stavrakakis | # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
|
26 | db400d82 | Christos Stavrakakis | # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
|
27 | db400d82 | Christos Stavrakakis | # POSSIBILITY OF SUCH DAMAGE.
|
28 | db400d82 | Christos Stavrakakis | #
|
29 | db400d82 | Christos Stavrakakis | # The views and conclusions contained in the software and
|
30 | db400d82 | Christos Stavrakakis | # documentation are those of the authors and should not be
|
31 | db400d82 | Christos Stavrakakis | # interpreted as representing official policies, either expressed
|
32 | db400d82 | Christos Stavrakakis | # or implied, of GRNET S.A.
|
33 | db400d82 | Christos Stavrakakis | |
34 | db400d82 | Christos Stavrakakis | """ Module implementing connection and communication with an AMQP broker.
|
35 | db400d82 | Christos Stavrakakis |
|
36 | db400d82 | Christos Stavrakakis | AMQP Client's implemented by this module silenty detect connection failures and
|
37 | db400d82 | Christos Stavrakakis | try to reconnect to any available broker. Also publishing takes advantage of
|
38 | db400d82 | Christos Stavrakakis | publisher-confirms in order to guarantee that messages are properly delivered
|
39 | db400d82 | Christos Stavrakakis | to the broker.
|
40 | db400d82 | Christos Stavrakakis |
|
41 | db400d82 | Christos Stavrakakis | """
|
42 | db400d82 | Christos Stavrakakis | |
43 | db400d82 | Christos Stavrakakis | import logging |
44 | db400d82 | Christos Stavrakakis | |
45 | db400d82 | Christos Stavrakakis | from puka import Client |
46 | db400d82 | Christos Stavrakakis | from puka import spec_exceptions |
47 | 6d27eadd | Christos Stavrakakis | import socket |
48 | db400d82 | Christos Stavrakakis | from socket import error as socket_error |
49 | db400d82 | Christos Stavrakakis | from time import sleep |
50 | db400d82 | Christos Stavrakakis | from random import shuffle |
51 | db400d82 | Christos Stavrakakis | from functools import wraps |
52 | 7627ec6f | Christos Stavrakakis | from synnefo.lib.ordereddict import OrderedDict |
53 | db400d82 | Christos Stavrakakis | from synnefo import settings |
54 | db400d82 | Christos Stavrakakis | |
55 | db400d82 | Christos Stavrakakis | |
56 | db400d82 | Christos Stavrakakis | def reconnect_decorator(func): |
57 | db400d82 | Christos Stavrakakis | """
|
58 | db400d82 | Christos Stavrakakis | Decorator for persistent connection with one or more AMQP brokers.
|
59 | db400d82 | Christos Stavrakakis |
|
60 | db400d82 | Christos Stavrakakis | """
|
61 | db400d82 | Christos Stavrakakis | @wraps(func)
|
62 | db400d82 | Christos Stavrakakis | def wrapper(self, *args, **kwargs): |
63 | db400d82 | Christos Stavrakakis | try:
|
64 | e0b68525 | Christos Stavrakakis | return func(self, *args, **kwargs) |
65 | db400d82 | Christos Stavrakakis | except (socket_error, spec_exceptions.ConnectionForced) as e: |
66 | 74d988b0 | Christos Stavrakakis | self.log.error('Connection Closed while in %s: %s', func.__name__, |
67 | 74d988b0 | Christos Stavrakakis | e) |
68 | db400d82 | Christos Stavrakakis | self.connect()
|
69 | db400d82 | Christos Stavrakakis | |
70 | db400d82 | Christos Stavrakakis | return wrapper
|
71 | db400d82 | Christos Stavrakakis | |
72 | db400d82 | Christos Stavrakakis | |
73 | db400d82 | Christos Stavrakakis | class AMQPPukaClient(object): |
74 | db400d82 | Christos Stavrakakis | """
|
75 | db400d82 | Christos Stavrakakis | AMQP generic client implementing most of the basic AMQP operations.
|
76 | db400d82 | Christos Stavrakakis |
|
77 | db400d82 | Christos Stavrakakis | """
|
78 | db400d82 | Christos Stavrakakis | def __init__(self, hosts=settings.AMQP_HOSTS, max_retries=30, |
79 | a8858945 | Christos Stavrakakis | confirms=True, confirm_buffer=100, logger=None): |
80 | 2ef10562 | Christos Stavrakakis | """
|
81 | 2ef10562 | Christos Stavrakakis | Format hosts as "amqp://username:pass@host:port"
|
82 | 2ef10562 | Christos Stavrakakis | max_retries=0 defaults to unlimited retries
|
83 | 2ef10562 | Christos Stavrakakis |
|
84 | 2ef10562 | Christos Stavrakakis | """
|
85 | db400d82 | Christos Stavrakakis | |
86 | db400d82 | Christos Stavrakakis | self.hosts = hosts
|
87 | db400d82 | Christos Stavrakakis | shuffle(self.hosts)
|
88 | db400d82 | Christos Stavrakakis | |
89 | db400d82 | Christos Stavrakakis | self.max_retries = max_retries
|
90 | db400d82 | Christos Stavrakakis | self.confirms = confirms
|
91 | db400d82 | Christos Stavrakakis | self.confirm_buffer = confirm_buffer
|
92 | db400d82 | Christos Stavrakakis | |
93 | db400d82 | Christos Stavrakakis | self.connection = None |
94 | db400d82 | Christos Stavrakakis | self.channel = None |
95 | db400d82 | Christos Stavrakakis | self.consumers = {}
|
96 | db400d82 | Christos Stavrakakis | self.unacked = OrderedDict()
|
97 | db400d82 | Christos Stavrakakis | self.unsend = OrderedDict()
|
98 | db400d82 | Christos Stavrakakis | self.consume_promises = []
|
99 | 25649e21 | Christos Stavrakakis | self.exchanges = []
|
100 | a8858945 | Christos Stavrakakis | if logger:
|
101 | a8858945 | Christos Stavrakakis | self.log = logger
|
102 | a8858945 | Christos Stavrakakis | else:
|
103 | a8858945 | Christos Stavrakakis | logger = logging.getLogger("amqp")
|
104 | a8858945 | Christos Stavrakakis | logging.basicConfig() |
105 | a8858945 | Christos Stavrakakis | self.log = logger
|
106 | db400d82 | Christos Stavrakakis | |
107 | db400d82 | Christos Stavrakakis | def connect(self, retries=0): |
108 | 2ef10562 | Christos Stavrakakis | if self.max_retries and retries >= self.max_retries: |
109 | a8858945 | Christos Stavrakakis | self.log.error("Aborting after %d retries", retries) |
110 | 74d988b0 | Christos Stavrakakis | raise AMQPConnectionError('Aborting after %d connection failures.' |
111 | 2ef10562 | Christos Stavrakakis | % retries) |
112 | db400d82 | Christos Stavrakakis | return
|
113 | db400d82 | Christos Stavrakakis | |
114 | db400d82 | Christos Stavrakakis | # Pick up a host
|
115 | db400d82 | Christos Stavrakakis | host = self.hosts.pop()
|
116 | db400d82 | Christos Stavrakakis | self.hosts.insert(0, host) |
117 | db400d82 | Christos Stavrakakis | |
118 | db400d82 | Christos Stavrakakis | self.client = Client(host, pubacks=self.confirms) |
119 | db400d82 | Christos Stavrakakis | |
120 | db400d82 | Christos Stavrakakis | host = host.split('@')[-1] |
121 | a8858945 | Christos Stavrakakis | self.log.debug('Connecting to node %s' % host) |
122 | db400d82 | Christos Stavrakakis | |
123 | db400d82 | Christos Stavrakakis | try:
|
124 | db400d82 | Christos Stavrakakis | promise = self.client.connect()
|
125 | db400d82 | Christos Stavrakakis | self.client.wait(promise)
|
126 | db400d82 | Christos Stavrakakis | except socket_error as e: |
127 | 2ef10562 | Christos Stavrakakis | if retries < len(self.hosts): |
128 | a8858945 | Christos Stavrakakis | self.log.warning('Cannot connect to host %s: %s', host, e) |
129 | 2ef10562 | Christos Stavrakakis | else:
|
130 | a8858945 | Christos Stavrakakis | self.log.error('Cannot connect to host %s: %s', host, e) |
131 | db400d82 | Christos Stavrakakis | sleep(1)
|
132 | db400d82 | Christos Stavrakakis | return self.connect(retries + 1) |
133 | db400d82 | Christos Stavrakakis | |
134 | a8858945 | Christos Stavrakakis | self.log.info('Successfully connected to host: %s', host) |
135 | db400d82 | Christos Stavrakakis | |
136 | 6d27eadd | Christos Stavrakakis | # Setup TCP keepalive option
|
137 | 6d27eadd | Christos Stavrakakis | self.client.sd.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) |
138 | 6d27eadd | Christos Stavrakakis | # Keepalive time
|
139 | 6d27eadd | Christos Stavrakakis | self.client.sd.setsockopt(socket.SOL_TCP, socket.TCP_KEEPIDLE, 20) |
140 | 6d27eadd | Christos Stavrakakis | # Keepalive interval
|
141 | 6d27eadd | Christos Stavrakakis | self.client.sd.setsockopt(socket.SOL_TCP, socket.TCP_KEEPINTVL, 2) |
142 | 6d27eadd | Christos Stavrakakis | # Keepalive retry
|
143 | 6d27eadd | Christos Stavrakakis | self.client.sd.setsockopt(socket.SOL_TCP, socket.TCP_KEEPCNT, 10) |
144 | 6d27eadd | Christos Stavrakakis | |
145 | a8858945 | Christos Stavrakakis | self.log.info('Creating channel') |
146 | db400d82 | Christos Stavrakakis | |
147 | b1bb9251 | Christos Stavrakakis | # Clear consume_promises each time connecting, since they are related
|
148 | b1bb9251 | Christos Stavrakakis | # to the connection object
|
149 | b1bb9251 | Christos Stavrakakis | self.consume_promises = []
|
150 | b1bb9251 | Christos Stavrakakis | |
151 | db400d82 | Christos Stavrakakis | if self.unacked: |
152 | db400d82 | Christos Stavrakakis | self._resend_unacked_messages()
|
153 | db400d82 | Christos Stavrakakis | |
154 | db400d82 | Christos Stavrakakis | if self.unsend: |
155 | db400d82 | Christos Stavrakakis | self._resend_unsend_messages()
|
156 | db400d82 | Christos Stavrakakis | |
157 | db400d82 | Christos Stavrakakis | if self.consumers: |
158 | db400d82 | Christos Stavrakakis | for queue, callback in self.consumers.items(): |
159 | db400d82 | Christos Stavrakakis | self.basic_consume(queue, callback)
|
160 | db400d82 | Christos Stavrakakis | |
161 | 25649e21 | Christos Stavrakakis | if self.exchanges: |
162 | 25649e21 | Christos Stavrakakis | exchanges = self.exchanges
|
163 | 25649e21 | Christos Stavrakakis | self.exchanges = []
|
164 | 25649e21 | Christos Stavrakakis | for exchange, type in exchanges: |
165 | 25649e21 | Christos Stavrakakis | self.exchange_declare(exchange, type) |
166 | 25649e21 | Christos Stavrakakis | |
167 | b1bb9251 | Christos Stavrakakis | @reconnect_decorator
|
168 | b1bb9251 | Christos Stavrakakis | def reconnect(self): |
169 | b1bb9251 | Christos Stavrakakis | self.close()
|
170 | b1bb9251 | Christos Stavrakakis | self.connect()
|
171 | b1bb9251 | Christos Stavrakakis | |
172 | db400d82 | Christos Stavrakakis | def exchange_declare(self, exchange, type='direct'): |
173 | db400d82 | Christos Stavrakakis | """Declare an exchange
|
174 | db400d82 | Christos Stavrakakis | @type exchange_name: string
|
175 | db400d82 | Christos Stavrakakis | @param exchange_name: name of the exchange
|
176 | db400d82 | Christos Stavrakakis | @type exchange_type: string
|
177 | db400d82 | Christos Stavrakakis | @param exhange_type: one of 'direct', 'topic', 'fanout'
|
178 | db400d82 | Christos Stavrakakis |
|
179 | db400d82 | Christos Stavrakakis | """
|
180 | a8858945 | Christos Stavrakakis | self.log.info('Declaring %s exchange: %s', type, exchange) |
181 | db400d82 | Christos Stavrakakis | promise = self.client.exchange_declare(exchange=exchange,
|
182 | db400d82 | Christos Stavrakakis | type=type,
|
183 | db400d82 | Christos Stavrakakis | durable=True,
|
184 | db400d82 | Christos Stavrakakis | auto_delete=False)
|
185 | db400d82 | Christos Stavrakakis | self.client.wait(promise)
|
186 | 25649e21 | Christos Stavrakakis | self.exchanges.append((exchange, type)) |
187 | db400d82 | Christos Stavrakakis | |
188 | db400d82 | Christos Stavrakakis | @reconnect_decorator
|
189 | db400d82 | Christos Stavrakakis | def queue_declare(self, queue, exclusive=False, |
190 | 147c3d12 | Christos Stavrakakis | mirrored=True, mirrored_nodes='all', |
191 | 147c3d12 | Christos Stavrakakis | dead_letter_exchange=None):
|
192 | db400d82 | Christos Stavrakakis | """Declare a queue
|
193 | db400d82 | Christos Stavrakakis |
|
194 | db400d82 | Christos Stavrakakis | @type queue: string
|
195 | db400d82 | Christos Stavrakakis | @param queue: name of the queue
|
196 | db400d82 | Christos Stavrakakis | @param mirrored: whether the queue will be mirrored to other brokers
|
197 | db400d82 | Christos Stavrakakis | @param mirrored_nodes: the policy for the mirrored queue.
|
198 | db400d82 | Christos Stavrakakis | Available policies:
|
199 | db400d82 | Christos Stavrakakis | - 'all': The queue is mirrored to all nodes and the
|
200 | db400d82 | Christos Stavrakakis | master node is the one to which the client is
|
201 | db400d82 | Christos Stavrakakis | connected
|
202 | db400d82 | Christos Stavrakakis | - a list of nodes. The queue will be mirrored only to
|
203 | db400d82 | Christos Stavrakakis | the specified nodes, and the master will be the
|
204 | db400d82 | Christos Stavrakakis | first node in the list. Node names must be provided
|
205 | db400d82 | Christos Stavrakakis | and not host IP. example: [node1@rabbit,node2@rabbit]
|
206 | db400d82 | Christos Stavrakakis |
|
207 | db400d82 | Christos Stavrakakis | """
|
208 | a8858945 | Christos Stavrakakis | self.log.info('Declaring queue: %s', queue) |
209 | db400d82 | Christos Stavrakakis | |
210 | db400d82 | Christos Stavrakakis | if mirrored:
|
211 | db400d82 | Christos Stavrakakis | if mirrored_nodes == 'all': |
212 | db400d82 | Christos Stavrakakis | arguments = {'x-ha-policy': 'all'} |
213 | db400d82 | Christos Stavrakakis | elif isinstance(mirrored_nodes, list): |
214 | db400d82 | Christos Stavrakakis | arguments = {'x-ha-policy': 'nodes', |
215 | 74d988b0 | Christos Stavrakakis | 'x-ha-policy-params': mirrored_nodes}
|
216 | db400d82 | Christos Stavrakakis | else:
|
217 | db400d82 | Christos Stavrakakis | raise AttributeError |
218 | db400d82 | Christos Stavrakakis | else:
|
219 | db400d82 | Christos Stavrakakis | arguments = {} |
220 | db400d82 | Christos Stavrakakis | |
221 | 147c3d12 | Christos Stavrakakis | if dead_letter_exchange:
|
222 | 147c3d12 | Christos Stavrakakis | arguments['x-dead-letter-exchange'] = dead_letter_exchange
|
223 | 147c3d12 | Christos Stavrakakis | |
224 | db400d82 | Christos Stavrakakis | promise = self.client.queue_declare(queue=queue, durable=True, |
225 | db400d82 | Christos Stavrakakis | exclusive=exclusive, |
226 | db400d82 | Christos Stavrakakis | auto_delete=False,
|
227 | db400d82 | Christos Stavrakakis | arguments=arguments) |
228 | db400d82 | Christos Stavrakakis | self.client.wait(promise)
|
229 | db400d82 | Christos Stavrakakis | |
230 | db400d82 | Christos Stavrakakis | def queue_bind(self, queue, exchange, routing_key): |
231 | a8858945 | Christos Stavrakakis | self.log.debug('Binding queue %s to exchange %s with key %s' |
232 | a8858945 | Christos Stavrakakis | % (queue, exchange, routing_key)) |
233 | db400d82 | Christos Stavrakakis | promise = self.client.queue_bind(exchange=exchange, queue=queue,
|
234 | db400d82 | Christos Stavrakakis | routing_key=routing_key) |
235 | db400d82 | Christos Stavrakakis | self.client.wait(promise)
|
236 | db400d82 | Christos Stavrakakis | |
237 | db400d82 | Christos Stavrakakis | @reconnect_decorator
|
238 | 147c3d12 | Christos Stavrakakis | def basic_publish(self, exchange, routing_key, body, headers={}): |
239 | db400d82 | Christos Stavrakakis | """Publish a message with a specific routing key """
|
240 | 147c3d12 | Christos Stavrakakis | self._publish(exchange, routing_key, body, headers)
|
241 | db400d82 | Christos Stavrakakis | |
242 | db400d82 | Christos Stavrakakis | self.flush_buffer()
|
243 | db400d82 | Christos Stavrakakis | |
244 | db400d82 | Christos Stavrakakis | if self.confirms and len(self.unacked) >= self.confirm_buffer: |
245 | db400d82 | Christos Stavrakakis | self.get_confirms()
|
246 | db400d82 | Christos Stavrakakis | |
247 | db400d82 | Christos Stavrakakis | @reconnect_decorator
|
248 | db400d82 | Christos Stavrakakis | def basic_publish_multi(self, exchange, routing_key, bodies): |
249 | db400d82 | Christos Stavrakakis | for body in bodies: |
250 | db400d82 | Christos Stavrakakis | self.unsend[body] = (exchange, routing_key)
|
251 | db400d82 | Christos Stavrakakis | |
252 | db400d82 | Christos Stavrakakis | for body in bodies: |
253 | db400d82 | Christos Stavrakakis | self._publish(exchange, routing_key, body)
|
254 | db400d82 | Christos Stavrakakis | self.unsend.pop(body)
|
255 | db400d82 | Christos Stavrakakis | |
256 | db400d82 | Christos Stavrakakis | self.flush_buffer()
|
257 | db400d82 | Christos Stavrakakis | |
258 | db400d82 | Christos Stavrakakis | if self.confirms: |
259 | db400d82 | Christos Stavrakakis | self.get_confirms()
|
260 | db400d82 | Christos Stavrakakis | |
261 | 147c3d12 | Christos Stavrakakis | def _publish(self, exchange, routing_key, body, headers={}): |
262 | db400d82 | Christos Stavrakakis | # Persisent messages by default!
|
263 | db400d82 | Christos Stavrakakis | headers['delivery_mode'] = 2 |
264 | db400d82 | Christos Stavrakakis | promise = self.client.basic_publish(exchange=exchange,
|
265 | db400d82 | Christos Stavrakakis | routing_key=routing_key, |
266 | db400d82 | Christos Stavrakakis | body=body, headers=headers) |
267 | db400d82 | Christos Stavrakakis | |
268 | db400d82 | Christos Stavrakakis | if self.confirms: |
269 | db400d82 | Christos Stavrakakis | self.unacked[promise] = (exchange, routing_key, body)
|
270 | db400d82 | Christos Stavrakakis | |
271 | db400d82 | Christos Stavrakakis | return promise
|
272 | db400d82 | Christos Stavrakakis | |
273 | db400d82 | Christos Stavrakakis | @reconnect_decorator
|
274 | db400d82 | Christos Stavrakakis | def flush_buffer(self): |
275 | db400d82 | Christos Stavrakakis | while self.client.needs_write(): |
276 | db400d82 | Christos Stavrakakis | self.client.on_write()
|
277 | db400d82 | Christos Stavrakakis | |
278 | db400d82 | Christos Stavrakakis | @reconnect_decorator
|
279 | db400d82 | Christos Stavrakakis | def get_confirms(self): |
280 | db400d82 | Christos Stavrakakis | for promise in self.unacked.keys(): |
281 | db400d82 | Christos Stavrakakis | self.client.wait(promise)
|
282 | db400d82 | Christos Stavrakakis | self.unacked.pop(promise)
|
283 | db400d82 | Christos Stavrakakis | |
284 | db400d82 | Christos Stavrakakis | @reconnect_decorator
|
285 | db400d82 | Christos Stavrakakis | def _resend_unacked_messages(self): |
286 | db400d82 | Christos Stavrakakis | """Resend unacked messages in case of a connection failure."""
|
287 | db400d82 | Christos Stavrakakis | msgs = self.unacked.values()
|
288 | db400d82 | Christos Stavrakakis | self.unacked.clear()
|
289 | db400d82 | Christos Stavrakakis | for exchange, routing_key, body in msgs: |
290 | a8858945 | Christos Stavrakakis | self.log.debug('Resending message %s' % body) |
291 | db400d82 | Christos Stavrakakis | self.basic_publish(exchange, routing_key, body)
|
292 | db400d82 | Christos Stavrakakis | |
293 | db400d82 | Christos Stavrakakis | @reconnect_decorator
|
294 | db400d82 | Christos Stavrakakis | def _resend_unsend_messages(self): |
295 | db400d82 | Christos Stavrakakis | """Resend unsend messages in case of a connection failure."""
|
296 | db400d82 | Christos Stavrakakis | for body in self.unsend.keys(): |
297 | db400d82 | Christos Stavrakakis | (exchange, routing_key) = self.unsend[body]
|
298 | db400d82 | Christos Stavrakakis | self.basic_publish(exchange, routing_key, body)
|
299 | db400d82 | Christos Stavrakakis | self.unsend.pop(body)
|
300 | db400d82 | Christos Stavrakakis | |
301 | db400d82 | Christos Stavrakakis | @reconnect_decorator
|
302 | db400d82 | Christos Stavrakakis | def basic_consume(self, queue, callback, prefetch_count=0): |
303 | db400d82 | Christos Stavrakakis | """Consume from a queue.
|
304 | db400d82 | Christos Stavrakakis |
|
305 | db400d82 | Christos Stavrakakis | @type queue: string or list of strings
|
306 | db400d82 | Christos Stavrakakis | @param queue: the name or list of names from the queues to consume
|
307 | db400d82 | Christos Stavrakakis | @type callback: function
|
308 | db400d82 | Christos Stavrakakis | @param callback: the callback function to run when a message arrives
|
309 | db400d82 | Christos Stavrakakis |
|
310 | db400d82 | Christos Stavrakakis | """
|
311 | db400d82 | Christos Stavrakakis | # Store the queues and the callback
|
312 | db400d82 | Christos Stavrakakis | self.consumers[queue] = callback
|
313 | db400d82 | Christos Stavrakakis | |
314 | db400d82 | Christos Stavrakakis | def handle_delivery(promise, msg): |
315 | db400d82 | Christos Stavrakakis | """Hide promises and messages without body"""
|
316 | db400d82 | Christos Stavrakakis | if 'body' in msg: |
317 | db400d82 | Christos Stavrakakis | callback(self, msg)
|
318 | db400d82 | Christos Stavrakakis | else:
|
319 | a8858945 | Christos Stavrakakis | self.log.debug("Message without body %s" % msg) |
320 | db400d82 | Christos Stavrakakis | raise socket_error
|
321 | db400d82 | Christos Stavrakakis | |
322 | db400d82 | Christos Stavrakakis | consume_promise = \ |
323 | 74d988b0 | Christos Stavrakakis | self.client.basic_consume(queue=queue,
|
324 | 74d988b0 | Christos Stavrakakis | prefetch_count=prefetch_count, |
325 | 74d988b0 | Christos Stavrakakis | callback=handle_delivery) |
326 | db400d82 | Christos Stavrakakis | |
327 | db400d82 | Christos Stavrakakis | self.consume_promises.append(consume_promise)
|
328 | db400d82 | Christos Stavrakakis | return consume_promise
|
329 | db400d82 | Christos Stavrakakis | |
330 | db400d82 | Christos Stavrakakis | @reconnect_decorator
|
331 | db400d82 | Christos Stavrakakis | def basic_wait(self, promise=None, timeout=0): |
332 | db400d82 | Christos Stavrakakis | """Wait for messages from the queues declared by basic_consume.
|
333 | db400d82 | Christos Stavrakakis |
|
334 | db400d82 | Christos Stavrakakis | This function will block until a message arrives from the queues that
|
335 | db400d82 | Christos Stavrakakis | have been declared with basic_consume. If the optional arguments
|
336 | db400d82 | Christos Stavrakakis | 'promise' is given, only messages for this promise will be delivered.
|
337 | db400d82 | Christos Stavrakakis |
|
338 | db400d82 | Christos Stavrakakis | """
|
339 | db400d82 | Christos Stavrakakis | if promise is not None: |
340 | b1bb9251 | Christos Stavrakakis | return self.client.wait(promise, timeout) |
341 | db400d82 | Christos Stavrakakis | else:
|
342 | b1bb9251 | Christos Stavrakakis | return self.client.wait(self.consume_promises, timeout) |
343 | db400d82 | Christos Stavrakakis | |
344 | db400d82 | Christos Stavrakakis | @reconnect_decorator
|
345 | db400d82 | Christos Stavrakakis | def basic_get(self, queue): |
346 | db400d82 | Christos Stavrakakis | """Get a single message from a queue.
|
347 | db400d82 | Christos Stavrakakis |
|
348 | db400d82 | Christos Stavrakakis | This is a non-blocking method for getting messages from a queue.
|
349 | db400d82 | Christos Stavrakakis | It will return None if the queue is empty.
|
350 | db400d82 | Christos Stavrakakis |
|
351 | db400d82 | Christos Stavrakakis | """
|
352 | db400d82 | Christos Stavrakakis | get_promise = self.client.basic_get(queue=queue)
|
353 | db400d82 | Christos Stavrakakis | result = self.client.wait(get_promise)
|
354 | db400d82 | Christos Stavrakakis | if 'empty' in result: |
355 | db400d82 | Christos Stavrakakis | # The queue is empty
|
356 | db400d82 | Christos Stavrakakis | return None |
357 | db400d82 | Christos Stavrakakis | else:
|
358 | db400d82 | Christos Stavrakakis | return result
|
359 | db400d82 | Christos Stavrakakis | |
360 | db400d82 | Christos Stavrakakis | @reconnect_decorator
|
361 | db400d82 | Christos Stavrakakis | def basic_ack(self, message): |
362 | db400d82 | Christos Stavrakakis | self.client.basic_ack(message)
|
363 | db400d82 | Christos Stavrakakis | |
364 | db400d82 | Christos Stavrakakis | @reconnect_decorator
|
365 | db400d82 | Christos Stavrakakis | def basic_nack(self, message): |
366 | 257b694d | Christos Stavrakakis | self.client.basic_ack(message)
|
367 | 257b694d | Christos Stavrakakis | |
368 | 257b694d | Christos Stavrakakis | @reconnect_decorator
|
369 | 257b694d | Christos Stavrakakis | def basic_reject(self, message, requeue=False): |
370 | 257b694d | Christos Stavrakakis | """Reject a message.
|
371 | 257b694d | Christos Stavrakakis |
|
372 | 257b694d | Christos Stavrakakis | If requeue option is False and a dead letter exchange is associated
|
373 | 257b694d | Christos Stavrakakis | with the queue, the message will be routed to the dead letter exchange.
|
374 | 257b694d | Christos Stavrakakis |
|
375 | 257b694d | Christos Stavrakakis | """
|
376 | 257b694d | Christos Stavrakakis | self.client.basic_reject(message, requeue=requeue)
|
377 | db400d82 | Christos Stavrakakis | |
378 | db400d82 | Christos Stavrakakis | def close(self): |
379 | db400d82 | Christos Stavrakakis | """Check that messages have been send and close the connection."""
|
380 | a8858945 | Christos Stavrakakis | self.log.debug("Closing connection to %s", self.client.host) |
381 | db400d82 | Christos Stavrakakis | try:
|
382 | db400d82 | Christos Stavrakakis | if self.confirms: |
383 | db400d82 | Christos Stavrakakis | self.get_confirms()
|
384 | db400d82 | Christos Stavrakakis | close_promise = self.client.close()
|
385 | db400d82 | Christos Stavrakakis | self.client.wait(close_promise)
|
386 | db400d82 | Christos Stavrakakis | except (socket_error, spec_exceptions.ConnectionForced) as e: |
387 | a8858945 | Christos Stavrakakis | self.log.error('Connection closed while closing connection:%s', e) |
388 | db400d82 | Christos Stavrakakis | |
389 | db400d82 | Christos Stavrakakis | def queue_delete(self, queue, if_unused=True, if_empty=True): |
390 | db400d82 | Christos Stavrakakis | """Delete a queue.
|
391 | db400d82 | Christos Stavrakakis |
|
392 | db400d82 | Christos Stavrakakis | Returns False if the queue does not exist
|
393 | db400d82 | Christos Stavrakakis | """
|
394 | db400d82 | Christos Stavrakakis | try:
|
395 | db400d82 | Christos Stavrakakis | promise = self.client.queue_delete(queue=queue,
|
396 | db400d82 | Christos Stavrakakis | if_unused=if_unused, |
397 | db400d82 | Christos Stavrakakis | if_empty=if_empty) |
398 | db400d82 | Christos Stavrakakis | self.client.wait(promise)
|
399 | db400d82 | Christos Stavrakakis | return True |
400 | db400d82 | Christos Stavrakakis | except spec_exceptions.NotFound:
|
401 | a8858945 | Christos Stavrakakis | self.log.info("Queue %s does not exist", queue) |
402 | db400d82 | Christos Stavrakakis | return False |
403 | db400d82 | Christos Stavrakakis | |
404 | db400d82 | Christos Stavrakakis | def exchange_delete(self, exchange, if_unused=True): |
405 | db400d82 | Christos Stavrakakis | """Delete an exchange."""
|
406 | db400d82 | Christos Stavrakakis | try:
|
407 | db400d82 | Christos Stavrakakis | |
408 | db400d82 | Christos Stavrakakis | promise = self.client.exchange_delete(exchange=exchange,
|
409 | db400d82 | Christos Stavrakakis | if_unused=if_unused) |
410 | db400d82 | Christos Stavrakakis | self.client.wait(promise)
|
411 | db400d82 | Christos Stavrakakis | return True |
412 | db400d82 | Christos Stavrakakis | except spec_exceptions.NotFound:
|
413 | a8858945 | Christos Stavrakakis | self.log.info("Exchange %s does not exist", exchange) |
414 | db400d82 | Christos Stavrakakis | return False |
415 | db400d82 | Christos Stavrakakis | |
416 | db400d82 | Christos Stavrakakis | @reconnect_decorator
|
417 | db400d82 | Christos Stavrakakis | def basic_cancel(self, promise=None): |
418 | db400d82 | Christos Stavrakakis | """Cancel consuming from a queue. """
|
419 | db400d82 | Christos Stavrakakis | if promise is not None: |
420 | db400d82 | Christos Stavrakakis | self.client.basic_cancel(promise)
|
421 | db400d82 | Christos Stavrakakis | else:
|
422 | db400d82 | Christos Stavrakakis | for promise in self.consume_promises: |
423 | db400d82 | Christos Stavrakakis | self.client.basic_cancel(promise)
|
424 | db400d82 | Christos Stavrakakis | |
425 | db400d82 | Christos Stavrakakis | |
426 | b537ac01 | Christos Stavrakakis | class AMQPConnectionError(Exception): |
427 | db400d82 | Christos Stavrakakis | pass |