root / snf-common / synnefo / lib / amqp_puka.py @ d01cd522
History | View | Annotate | Download (13.4 kB)
1 | db400d82 | Christos Stavrakakis | # Copyright 2012 GRNET S.A. All rights reserved.
|
---|---|---|---|
2 | db400d82 | Christos Stavrakakis | #
|
3 | db400d82 | Christos Stavrakakis | # Redistribution and use in source and binary forms, with or
|
4 | db400d82 | Christos Stavrakakis | # without modification, are permitted provided that the following
|
5 | db400d82 | Christos Stavrakakis | # conditions are met:
|
6 | db400d82 | Christos Stavrakakis | #
|
7 | db400d82 | Christos Stavrakakis | # 1. Redistributions of source code must retain the above
|
8 | db400d82 | Christos Stavrakakis | # copyright notice, this list of conditions and the following
|
9 | db400d82 | Christos Stavrakakis | # disclaimer.
|
10 | db400d82 | Christos Stavrakakis | #
|
11 | db400d82 | Christos Stavrakakis | # 2. Redistributions in binary form must reproduce the above
|
12 | db400d82 | Christos Stavrakakis | # copyright notice, this list of conditions and the following
|
13 | db400d82 | Christos Stavrakakis | # disclaimer in the documentation and/or other materials
|
14 | db400d82 | Christos Stavrakakis | # provided with the distribution.
|
15 | db400d82 | Christos Stavrakakis | #
|
16 | db400d82 | Christos Stavrakakis | # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
|
17 | db400d82 | Christos Stavrakakis | # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
|
18 | db400d82 | Christos Stavrakakis | # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
|
19 | db400d82 | Christos Stavrakakis | # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
|
20 | db400d82 | Christos Stavrakakis | # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
|
21 | db400d82 | Christos Stavrakakis | # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
|
22 | db400d82 | Christos Stavrakakis | # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
|
23 | db400d82 | Christos Stavrakakis | # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
|
24 | db400d82 | Christos Stavrakakis | # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
|
25 | db400d82 | Christos Stavrakakis | # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
|
26 | db400d82 | Christos Stavrakakis | # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
|
27 | db400d82 | Christos Stavrakakis | # POSSIBILITY OF SUCH DAMAGE.
|
28 | db400d82 | Christos Stavrakakis | #
|
29 | db400d82 | Christos Stavrakakis | # The views and conclusions contained in the software and
|
30 | db400d82 | Christos Stavrakakis | # documentation are those of the authors and should not be
|
31 | db400d82 | Christos Stavrakakis | # interpreted as representing official policies, either expressed
|
32 | db400d82 | Christos Stavrakakis | # or implied, of GRNET S.A.
|
33 | db400d82 | Christos Stavrakakis | |
34 | db400d82 | Christos Stavrakakis | """ Module implementing connection and communication with an AMQP broker.
|
35 | db400d82 | Christos Stavrakakis |
|
36 | db400d82 | Christos Stavrakakis | AMQP Client's implemented by this module silenty detect connection failures and
|
37 | db400d82 | Christos Stavrakakis | try to reconnect to any available broker. Also publishing takes advantage of
|
38 | db400d82 | Christos Stavrakakis | publisher-confirms in order to guarantee that messages are properly delivered
|
39 | db400d82 | Christos Stavrakakis | to the broker.
|
40 | db400d82 | Christos Stavrakakis |
|
41 | db400d82 | Christos Stavrakakis | """
|
42 | db400d82 | Christos Stavrakakis | |
43 | db400d82 | Christos Stavrakakis | import logging |
44 | db400d82 | Christos Stavrakakis | |
45 | db400d82 | Christos Stavrakakis | from puka import Client |
46 | db400d82 | Christos Stavrakakis | from puka import spec_exceptions |
47 | db400d82 | Christos Stavrakakis | from socket import error as socket_error |
48 | db400d82 | Christos Stavrakakis | from time import sleep |
49 | db400d82 | Christos Stavrakakis | from random import shuffle |
50 | db400d82 | Christos Stavrakakis | from functools import wraps |
51 | db400d82 | Christos Stavrakakis | from ordereddict import OrderedDict |
52 | db400d82 | Christos Stavrakakis | from synnefo import settings |
53 | db400d82 | Christos Stavrakakis | |
54 | db400d82 | Christos Stavrakakis | logger = logging.getLogger() |
55 | db400d82 | Christos Stavrakakis | |
56 | db400d82 | Christos Stavrakakis | |
57 | db400d82 | Christos Stavrakakis | def reconnect_decorator(func): |
58 | db400d82 | Christos Stavrakakis | """
|
59 | db400d82 | Christos Stavrakakis | Decorator for persistent connection with one or more AMQP brokers.
|
60 | db400d82 | Christos Stavrakakis |
|
61 | db400d82 | Christos Stavrakakis | """
|
62 | db400d82 | Christos Stavrakakis | @wraps(func)
|
63 | db400d82 | Christos Stavrakakis | def wrapper(self, *args, **kwargs): |
64 | db400d82 | Christos Stavrakakis | try:
|
65 | e0b68525 | Christos Stavrakakis | return func(self, *args, **kwargs) |
66 | db400d82 | Christos Stavrakakis | except (socket_error, spec_exceptions.ConnectionForced) as e: |
67 | db400d82 | Christos Stavrakakis | logger.error('Connection Closed while in %s: %s', func.__name__, e)
|
68 | db400d82 | Christos Stavrakakis | self.consume_promises = []
|
69 | db400d82 | Christos Stavrakakis | self.connect()
|
70 | db400d82 | Christos Stavrakakis | |
71 | db400d82 | Christos Stavrakakis | return wrapper
|
72 | db400d82 | Christos Stavrakakis | |
73 | db400d82 | Christos Stavrakakis | |
74 | db400d82 | Christos Stavrakakis | class AMQPPukaClient(object): |
75 | db400d82 | Christos Stavrakakis | """
|
76 | db400d82 | Christos Stavrakakis | AMQP generic client implementing most of the basic AMQP operations.
|
77 | db400d82 | Christos Stavrakakis |
|
78 | db400d82 | Christos Stavrakakis | """
|
79 | db400d82 | Christos Stavrakakis | def __init__(self, hosts=settings.AMQP_HOSTS, max_retries=30, |
80 | db400d82 | Christos Stavrakakis | confirms=True, confirm_buffer=100): |
81 | 2ef10562 | Christos Stavrakakis | """
|
82 | 2ef10562 | Christos Stavrakakis | Format hosts as "amqp://username:pass@host:port"
|
83 | 2ef10562 | Christos Stavrakakis | max_retries=0 defaults to unlimited retries
|
84 | 2ef10562 | Christos Stavrakakis |
|
85 | 2ef10562 | Christos Stavrakakis | """
|
86 | db400d82 | Christos Stavrakakis | |
87 | db400d82 | Christos Stavrakakis | self.hosts = hosts
|
88 | db400d82 | Christos Stavrakakis | shuffle(self.hosts)
|
89 | db400d82 | Christos Stavrakakis | |
90 | db400d82 | Christos Stavrakakis | self.max_retries = max_retries
|
91 | db400d82 | Christos Stavrakakis | self.confirms = confirms
|
92 | db400d82 | Christos Stavrakakis | self.confirm_buffer = confirm_buffer
|
93 | db400d82 | Christos Stavrakakis | |
94 | db400d82 | Christos Stavrakakis | self.connection = None |
95 | db400d82 | Christos Stavrakakis | self.channel = None |
96 | db400d82 | Christos Stavrakakis | self.consumers = {}
|
97 | db400d82 | Christos Stavrakakis | self.unacked = OrderedDict()
|
98 | db400d82 | Christos Stavrakakis | self.unsend = OrderedDict()
|
99 | db400d82 | Christos Stavrakakis | self.consume_promises = []
|
100 | 25649e21 | Christos Stavrakakis | self.exchanges = []
|
101 | db400d82 | Christos Stavrakakis | |
102 | db400d82 | Christos Stavrakakis | def connect(self, retries=0): |
103 | 2ef10562 | Christos Stavrakakis | if self.max_retries and retries >= self.max_retries: |
104 | 2ef10562 | Christos Stavrakakis | logger.error("Aborting after %d retries", retries)
|
105 | db400d82 | Christos Stavrakakis | raise AMQPConnectionError('Aborting after %d connection failures.'\ |
106 | 2ef10562 | Christos Stavrakakis | % retries) |
107 | db400d82 | Christos Stavrakakis | return
|
108 | db400d82 | Christos Stavrakakis | |
109 | db400d82 | Christos Stavrakakis | # Pick up a host
|
110 | db400d82 | Christos Stavrakakis | host = self.hosts.pop()
|
111 | db400d82 | Christos Stavrakakis | self.hosts.insert(0, host) |
112 | db400d82 | Christos Stavrakakis | |
113 | db400d82 | Christos Stavrakakis | self.client = Client(host, pubacks=self.confirms) |
114 | db400d82 | Christos Stavrakakis | |
115 | db400d82 | Christos Stavrakakis | host = host.split('@')[-1] |
116 | db400d82 | Christos Stavrakakis | logger.debug('Connecting to node %s' % host)
|
117 | db400d82 | Christos Stavrakakis | |
118 | db400d82 | Christos Stavrakakis | try:
|
119 | db400d82 | Christos Stavrakakis | promise = self.client.connect()
|
120 | db400d82 | Christos Stavrakakis | self.client.wait(promise)
|
121 | db400d82 | Christos Stavrakakis | except socket_error as e: |
122 | 2ef10562 | Christos Stavrakakis | if retries < len(self.hosts): |
123 | 2ef10562 | Christos Stavrakakis | logger.warning('Cannot connect to host %s: %s', host, e)
|
124 | 2ef10562 | Christos Stavrakakis | else:
|
125 | 2ef10562 | Christos Stavrakakis | logger.error('Cannot connect to host %s: %s', host, e)
|
126 | db400d82 | Christos Stavrakakis | sleep(1)
|
127 | db400d82 | Christos Stavrakakis | return self.connect(retries + 1) |
128 | db400d82 | Christos Stavrakakis | |
129 | db400d82 | Christos Stavrakakis | logger.info('Successfully connected to host: %s', host)
|
130 | db400d82 | Christos Stavrakakis | |
131 | db400d82 | Christos Stavrakakis | logger.info('Creating channel')
|
132 | db400d82 | Christos Stavrakakis | |
133 | db400d82 | Christos Stavrakakis | if self.unacked: |
134 | db400d82 | Christos Stavrakakis | self._resend_unacked_messages()
|
135 | db400d82 | Christos Stavrakakis | |
136 | db400d82 | Christos Stavrakakis | if self.unsend: |
137 | db400d82 | Christos Stavrakakis | self._resend_unsend_messages()
|
138 | db400d82 | Christos Stavrakakis | |
139 | db400d82 | Christos Stavrakakis | if self.consumers: |
140 | db400d82 | Christos Stavrakakis | for queue, callback in self.consumers.items(): |
141 | db400d82 | Christos Stavrakakis | self.basic_consume(queue, callback)
|
142 | db400d82 | Christos Stavrakakis | |
143 | 25649e21 | Christos Stavrakakis | if self.exchanges: |
144 | 25649e21 | Christos Stavrakakis | exchanges = self.exchanges
|
145 | 25649e21 | Christos Stavrakakis | self.exchanges = []
|
146 | 25649e21 | Christos Stavrakakis | for exchange, type in exchanges: |
147 | 25649e21 | Christos Stavrakakis | self.exchange_declare(exchange, type) |
148 | 25649e21 | Christos Stavrakakis | |
149 | db400d82 | Christos Stavrakakis | def exchange_declare(self, exchange, type='direct'): |
150 | db400d82 | Christos Stavrakakis | """Declare an exchange
|
151 | db400d82 | Christos Stavrakakis | @type exchange_name: string
|
152 | db400d82 | Christos Stavrakakis | @param exchange_name: name of the exchange
|
153 | db400d82 | Christos Stavrakakis | @type exchange_type: string
|
154 | db400d82 | Christos Stavrakakis | @param exhange_type: one of 'direct', 'topic', 'fanout'
|
155 | db400d82 | Christos Stavrakakis |
|
156 | db400d82 | Christos Stavrakakis | """
|
157 | db400d82 | Christos Stavrakakis | logger.info('Declaring %s exchange: %s', type, exchange) |
158 | db400d82 | Christos Stavrakakis | promise = self.client.exchange_declare(exchange=exchange,
|
159 | db400d82 | Christos Stavrakakis | type=type,
|
160 | db400d82 | Christos Stavrakakis | durable=True,
|
161 | db400d82 | Christos Stavrakakis | auto_delete=False)
|
162 | db400d82 | Christos Stavrakakis | self.client.wait(promise)
|
163 | 25649e21 | Christos Stavrakakis | self.exchanges.append((exchange, type)) |
164 | db400d82 | Christos Stavrakakis | |
165 | db400d82 | Christos Stavrakakis | @reconnect_decorator
|
166 | db400d82 | Christos Stavrakakis | def queue_declare(self, queue, exclusive=False, |
167 | db400d82 | Christos Stavrakakis | mirrored=True, mirrored_nodes='all'): |
168 | db400d82 | Christos Stavrakakis | """Declare a queue
|
169 | db400d82 | Christos Stavrakakis |
|
170 | db400d82 | Christos Stavrakakis | @type queue: string
|
171 | db400d82 | Christos Stavrakakis | @param queue: name of the queue
|
172 | db400d82 | Christos Stavrakakis | @param mirrored: whether the queue will be mirrored to other brokers
|
173 | db400d82 | Christos Stavrakakis | @param mirrored_nodes: the policy for the mirrored queue.
|
174 | db400d82 | Christos Stavrakakis | Available policies:
|
175 | db400d82 | Christos Stavrakakis | - 'all': The queue is mirrored to all nodes and the
|
176 | db400d82 | Christos Stavrakakis | master node is the one to which the client is
|
177 | db400d82 | Christos Stavrakakis | connected
|
178 | db400d82 | Christos Stavrakakis | - a list of nodes. The queue will be mirrored only to
|
179 | db400d82 | Christos Stavrakakis | the specified nodes, and the master will be the
|
180 | db400d82 | Christos Stavrakakis | first node in the list. Node names must be provided
|
181 | db400d82 | Christos Stavrakakis | and not host IP. example: [node1@rabbit,node2@rabbit]
|
182 | db400d82 | Christos Stavrakakis |
|
183 | db400d82 | Christos Stavrakakis | """
|
184 | db400d82 | Christos Stavrakakis | logger.info('Declaring queue: %s', queue)
|
185 | db400d82 | Christos Stavrakakis | |
186 | db400d82 | Christos Stavrakakis | if mirrored:
|
187 | db400d82 | Christos Stavrakakis | if mirrored_nodes == 'all': |
188 | db400d82 | Christos Stavrakakis | arguments = {'x-ha-policy': 'all'} |
189 | db400d82 | Christos Stavrakakis | elif isinstance(mirrored_nodes, list): |
190 | db400d82 | Christos Stavrakakis | arguments = {'x-ha-policy': 'nodes', |
191 | db400d82 | Christos Stavrakakis | 'x-ha-policy-params': mirrored_nodes}
|
192 | db400d82 | Christos Stavrakakis | else:
|
193 | db400d82 | Christos Stavrakakis | raise AttributeError |
194 | db400d82 | Christos Stavrakakis | else:
|
195 | db400d82 | Christos Stavrakakis | arguments = {} |
196 | db400d82 | Christos Stavrakakis | |
197 | db400d82 | Christos Stavrakakis | promise = self.client.queue_declare(queue=queue, durable=True, |
198 | db400d82 | Christos Stavrakakis | exclusive=exclusive, |
199 | db400d82 | Christos Stavrakakis | auto_delete=False,
|
200 | db400d82 | Christos Stavrakakis | arguments=arguments) |
201 | db400d82 | Christos Stavrakakis | self.client.wait(promise)
|
202 | db400d82 | Christos Stavrakakis | |
203 | db400d82 | Christos Stavrakakis | def queue_bind(self, queue, exchange, routing_key): |
204 | db400d82 | Christos Stavrakakis | logger.debug('Binding queue %s to exchange %s with key %s'
|
205 | db400d82 | Christos Stavrakakis | % (queue, exchange, routing_key)) |
206 | db400d82 | Christos Stavrakakis | promise = self.client.queue_bind(exchange=exchange, queue=queue,
|
207 | db400d82 | Christos Stavrakakis | routing_key=routing_key) |
208 | db400d82 | Christos Stavrakakis | self.client.wait(promise)
|
209 | db400d82 | Christos Stavrakakis | |
210 | db400d82 | Christos Stavrakakis | @reconnect_decorator
|
211 | db400d82 | Christos Stavrakakis | def basic_publish(self, exchange, routing_key, body): |
212 | db400d82 | Christos Stavrakakis | """Publish a message with a specific routing key """
|
213 | db400d82 | Christos Stavrakakis | self._publish(exchange, routing_key, body)
|
214 | db400d82 | Christos Stavrakakis | |
215 | db400d82 | Christos Stavrakakis | self.flush_buffer()
|
216 | db400d82 | Christos Stavrakakis | |
217 | db400d82 | Christos Stavrakakis | if self.confirms and len(self.unacked) >= self.confirm_buffer: |
218 | db400d82 | Christos Stavrakakis | self.get_confirms()
|
219 | db400d82 | Christos Stavrakakis | |
220 | db400d82 | Christos Stavrakakis | @reconnect_decorator
|
221 | db400d82 | Christos Stavrakakis | def basic_publish_multi(self, exchange, routing_key, bodies): |
222 | db400d82 | Christos Stavrakakis | for body in bodies: |
223 | db400d82 | Christos Stavrakakis | self.unsend[body] = (exchange, routing_key)
|
224 | db400d82 | Christos Stavrakakis | |
225 | db400d82 | Christos Stavrakakis | for body in bodies: |
226 | db400d82 | Christos Stavrakakis | self._publish(exchange, routing_key, body)
|
227 | db400d82 | Christos Stavrakakis | self.unsend.pop(body)
|
228 | db400d82 | Christos Stavrakakis | |
229 | db400d82 | Christos Stavrakakis | self.flush_buffer()
|
230 | db400d82 | Christos Stavrakakis | |
231 | db400d82 | Christos Stavrakakis | if self.confirms: |
232 | db400d82 | Christos Stavrakakis | self.get_confirms()
|
233 | db400d82 | Christos Stavrakakis | |
234 | db400d82 | Christos Stavrakakis | def _publish(self, exchange, routing_key, body): |
235 | db400d82 | Christos Stavrakakis | # Persisent messages by default!
|
236 | db400d82 | Christos Stavrakakis | headers = {} |
237 | db400d82 | Christos Stavrakakis | headers['delivery_mode'] = 2 |
238 | db400d82 | Christos Stavrakakis | promise = self.client.basic_publish(exchange=exchange,
|
239 | db400d82 | Christos Stavrakakis | routing_key=routing_key, |
240 | db400d82 | Christos Stavrakakis | body=body, headers=headers) |
241 | db400d82 | Christos Stavrakakis | |
242 | db400d82 | Christos Stavrakakis | if self.confirms: |
243 | db400d82 | Christos Stavrakakis | self.unacked[promise] = (exchange, routing_key, body)
|
244 | db400d82 | Christos Stavrakakis | |
245 | db400d82 | Christos Stavrakakis | return promise
|
246 | db400d82 | Christos Stavrakakis | |
247 | db400d82 | Christos Stavrakakis | @reconnect_decorator
|
248 | db400d82 | Christos Stavrakakis | def flush_buffer(self): |
249 | db400d82 | Christos Stavrakakis | while self.client.needs_write(): |
250 | db400d82 | Christos Stavrakakis | self.client.on_write()
|
251 | db400d82 | Christos Stavrakakis | |
252 | db400d82 | Christos Stavrakakis | @reconnect_decorator
|
253 | db400d82 | Christos Stavrakakis | def get_confirms(self): |
254 | db400d82 | Christos Stavrakakis | for promise in self.unacked.keys(): |
255 | db400d82 | Christos Stavrakakis | self.client.wait(promise)
|
256 | db400d82 | Christos Stavrakakis | self.unacked.pop(promise)
|
257 | db400d82 | Christos Stavrakakis | |
258 | db400d82 | Christos Stavrakakis | @reconnect_decorator
|
259 | db400d82 | Christos Stavrakakis | def _resend_unacked_messages(self): |
260 | db400d82 | Christos Stavrakakis | """Resend unacked messages in case of a connection failure."""
|
261 | db400d82 | Christos Stavrakakis | msgs = self.unacked.values()
|
262 | db400d82 | Christos Stavrakakis | self.unacked.clear()
|
263 | db400d82 | Christos Stavrakakis | for exchange, routing_key, body in msgs: |
264 | db400d82 | Christos Stavrakakis | logger.debug('Resending message %s' % body)
|
265 | db400d82 | Christos Stavrakakis | self.basic_publish(exchange, routing_key, body)
|
266 | db400d82 | Christos Stavrakakis | |
267 | db400d82 | Christos Stavrakakis | @reconnect_decorator
|
268 | db400d82 | Christos Stavrakakis | def _resend_unsend_messages(self): |
269 | db400d82 | Christos Stavrakakis | """Resend unsend messages in case of a connection failure."""
|
270 | db400d82 | Christos Stavrakakis | for body in self.unsend.keys(): |
271 | db400d82 | Christos Stavrakakis | (exchange, routing_key) = self.unsend[body]
|
272 | db400d82 | Christos Stavrakakis | self.basic_publish(exchange, routing_key, body)
|
273 | db400d82 | Christos Stavrakakis | self.unsend.pop(body)
|
274 | db400d82 | Christos Stavrakakis | |
275 | db400d82 | Christos Stavrakakis | @reconnect_decorator
|
276 | db400d82 | Christos Stavrakakis | def basic_consume(self, queue, callback, prefetch_count=0): |
277 | db400d82 | Christos Stavrakakis | """Consume from a queue.
|
278 | db400d82 | Christos Stavrakakis |
|
279 | db400d82 | Christos Stavrakakis | @type queue: string or list of strings
|
280 | db400d82 | Christos Stavrakakis | @param queue: the name or list of names from the queues to consume
|
281 | db400d82 | Christos Stavrakakis | @type callback: function
|
282 | db400d82 | Christos Stavrakakis | @param callback: the callback function to run when a message arrives
|
283 | db400d82 | Christos Stavrakakis |
|
284 | db400d82 | Christos Stavrakakis | """
|
285 | db400d82 | Christos Stavrakakis | # Store the queues and the callback
|
286 | db400d82 | Christos Stavrakakis | self.consumers[queue] = callback
|
287 | db400d82 | Christos Stavrakakis | |
288 | db400d82 | Christos Stavrakakis | def handle_delivery(promise, msg): |
289 | db400d82 | Christos Stavrakakis | """Hide promises and messages without body"""
|
290 | db400d82 | Christos Stavrakakis | if 'body' in msg: |
291 | db400d82 | Christos Stavrakakis | callback(self, msg)
|
292 | db400d82 | Christos Stavrakakis | else:
|
293 | db400d82 | Christos Stavrakakis | logger.debug("Message without body %s" % msg)
|
294 | db400d82 | Christos Stavrakakis | raise socket_error
|
295 | db400d82 | Christos Stavrakakis | |
296 | db400d82 | Christos Stavrakakis | consume_promise = \ |
297 | db400d82 | Christos Stavrakakis | self.client.basic_consume(queue=queue,
|
298 | db400d82 | Christos Stavrakakis | prefetch_count=prefetch_count, |
299 | db400d82 | Christos Stavrakakis | callback=handle_delivery) |
300 | db400d82 | Christos Stavrakakis | |
301 | db400d82 | Christos Stavrakakis | self.consume_promises.append(consume_promise)
|
302 | db400d82 | Christos Stavrakakis | return consume_promise
|
303 | db400d82 | Christos Stavrakakis | |
304 | db400d82 | Christos Stavrakakis | @reconnect_decorator
|
305 | db400d82 | Christos Stavrakakis | def basic_wait(self, promise=None, timeout=0): |
306 | db400d82 | Christos Stavrakakis | """Wait for messages from the queues declared by basic_consume.
|
307 | db400d82 | Christos Stavrakakis |
|
308 | db400d82 | Christos Stavrakakis | This function will block until a message arrives from the queues that
|
309 | db400d82 | Christos Stavrakakis | have been declared with basic_consume. If the optional arguments
|
310 | db400d82 | Christos Stavrakakis | 'promise' is given, only messages for this promise will be delivered.
|
311 | db400d82 | Christos Stavrakakis |
|
312 | db400d82 | Christos Stavrakakis | """
|
313 | db400d82 | Christos Stavrakakis | if promise is not None: |
314 | db400d82 | Christos Stavrakakis | self.client.wait(promise, timeout)
|
315 | db400d82 | Christos Stavrakakis | else:
|
316 | db400d82 | Christos Stavrakakis | self.client.wait(self.consume_promises) |
317 | db400d82 | Christos Stavrakakis | |
318 | db400d82 | Christos Stavrakakis | @reconnect_decorator
|
319 | db400d82 | Christos Stavrakakis | def basic_get(self, queue): |
320 | db400d82 | Christos Stavrakakis | """Get a single message from a queue.
|
321 | db400d82 | Christos Stavrakakis |
|
322 | db400d82 | Christos Stavrakakis | This is a non-blocking method for getting messages from a queue.
|
323 | db400d82 | Christos Stavrakakis | It will return None if the queue is empty.
|
324 | db400d82 | Christos Stavrakakis |
|
325 | db400d82 | Christos Stavrakakis | """
|
326 | db400d82 | Christos Stavrakakis | get_promise = self.client.basic_get(queue=queue)
|
327 | db400d82 | Christos Stavrakakis | result = self.client.wait(get_promise)
|
328 | db400d82 | Christos Stavrakakis | if 'empty' in result: |
329 | db400d82 | Christos Stavrakakis | # The queue is empty
|
330 | db400d82 | Christos Stavrakakis | return None |
331 | db400d82 | Christos Stavrakakis | else:
|
332 | db400d82 | Christos Stavrakakis | return result
|
333 | db400d82 | Christos Stavrakakis | |
334 | db400d82 | Christos Stavrakakis | @reconnect_decorator
|
335 | db400d82 | Christos Stavrakakis | def basic_ack(self, message): |
336 | db400d82 | Christos Stavrakakis | self.client.basic_ack(message)
|
337 | db400d82 | Christos Stavrakakis | |
338 | db400d82 | Christos Stavrakakis | @reconnect_decorator
|
339 | db400d82 | Christos Stavrakakis | def basic_nack(self, message): |
340 | db400d82 | Christos Stavrakakis | #TODO:
|
341 | db400d82 | Christos Stavrakakis | pass
|
342 | db400d82 | Christos Stavrakakis | |
343 | db400d82 | Christos Stavrakakis | def close(self): |
344 | db400d82 | Christos Stavrakakis | """Check that messages have been send and close the connection."""
|
345 | db400d82 | Christos Stavrakakis | try:
|
346 | db400d82 | Christos Stavrakakis | if self.confirms: |
347 | db400d82 | Christos Stavrakakis | self.get_confirms()
|
348 | db400d82 | Christos Stavrakakis | close_promise = self.client.close()
|
349 | db400d82 | Christos Stavrakakis | self.client.wait(close_promise)
|
350 | db400d82 | Christos Stavrakakis | except (socket_error, spec_exceptions.ConnectionForced) as e: |
351 | db400d82 | Christos Stavrakakis | logger.error('Connection closed while closing connection:%s',
|
352 | db400d82 | Christos Stavrakakis | e) |
353 | db400d82 | Christos Stavrakakis | |
354 | db400d82 | Christos Stavrakakis | def queue_delete(self, queue, if_unused=True, if_empty=True): |
355 | db400d82 | Christos Stavrakakis | """Delete a queue.
|
356 | db400d82 | Christos Stavrakakis |
|
357 | db400d82 | Christos Stavrakakis | Returns False if the queue does not exist
|
358 | db400d82 | Christos Stavrakakis | """
|
359 | db400d82 | Christos Stavrakakis | try:
|
360 | db400d82 | Christos Stavrakakis | promise = self.client.queue_delete(queue=queue,
|
361 | db400d82 | Christos Stavrakakis | if_unused=if_unused, |
362 | db400d82 | Christos Stavrakakis | if_empty=if_empty) |
363 | db400d82 | Christos Stavrakakis | self.client.wait(promise)
|
364 | db400d82 | Christos Stavrakakis | return True |
365 | db400d82 | Christos Stavrakakis | except spec_exceptions.NotFound:
|
366 | db400d82 | Christos Stavrakakis | logger.info("Queue %s does not exist", queue)
|
367 | db400d82 | Christos Stavrakakis | return False |
368 | db400d82 | Christos Stavrakakis | |
369 | db400d82 | Christos Stavrakakis | def exchange_delete(self, exchange, if_unused=True): |
370 | db400d82 | Christos Stavrakakis | """Delete an exchange."""
|
371 | db400d82 | Christos Stavrakakis | try:
|
372 | db400d82 | Christos Stavrakakis | |
373 | db400d82 | Christos Stavrakakis | promise = self.client.exchange_delete(exchange=exchange,
|
374 | db400d82 | Christos Stavrakakis | if_unused=if_unused) |
375 | db400d82 | Christos Stavrakakis | self.client.wait(promise)
|
376 | db400d82 | Christos Stavrakakis | return True |
377 | db400d82 | Christos Stavrakakis | except spec_exceptions.NotFound:
|
378 | db400d82 | Christos Stavrakakis | logger.info("Exchange %s does not exist", exchange)
|
379 | db400d82 | Christos Stavrakakis | return False |
380 | db400d82 | Christos Stavrakakis | |
381 | db400d82 | Christos Stavrakakis | @reconnect_decorator
|
382 | db400d82 | Christos Stavrakakis | def basic_cancel(self, promise=None): |
383 | db400d82 | Christos Stavrakakis | """Cancel consuming from a queue. """
|
384 | db400d82 | Christos Stavrakakis | if promise is not None: |
385 | db400d82 | Christos Stavrakakis | self.client.basic_cancel(promise)
|
386 | db400d82 | Christos Stavrakakis | else:
|
387 | db400d82 | Christos Stavrakakis | for promise in self.consume_promises: |
388 | db400d82 | Christos Stavrakakis | self.client.basic_cancel(promise)
|
389 | db400d82 | Christos Stavrakakis | |
390 | db400d82 | Christos Stavrakakis | |
391 | b537ac01 | Christos Stavrakakis | class AMQPConnectionError(Exception): |
392 | db400d82 | Christos Stavrakakis | pass |