root / snf-common / synnefo / lib / amqp_puka.py @ 5f6ad491
History | View | Annotate | Download (13.4 kB)
1 | db400d82 | Christos Stavrakakis | # Copyright 2012 GRNET S.A. All rights reserved.
|
---|---|---|---|
2 | db400d82 | Christos Stavrakakis | #
|
3 | db400d82 | Christos Stavrakakis | # Redistribution and use in source and binary forms, with or
|
4 | db400d82 | Christos Stavrakakis | # without modification, are permitted provided that the following
|
5 | db400d82 | Christos Stavrakakis | # conditions are met:
|
6 | db400d82 | Christos Stavrakakis | #
|
7 | db400d82 | Christos Stavrakakis | # 1. Redistributions of source code must retain the above
|
8 | db400d82 | Christos Stavrakakis | # copyright notice, this list of conditions and the following
|
9 | db400d82 | Christos Stavrakakis | # disclaimer.
|
10 | db400d82 | Christos Stavrakakis | #
|
11 | db400d82 | Christos Stavrakakis | # 2. Redistributions in binary form must reproduce the above
|
12 | db400d82 | Christos Stavrakakis | # copyright notice, this list of conditions and the following
|
13 | db400d82 | Christos Stavrakakis | # disclaimer in the documentation and/or other materials
|
14 | db400d82 | Christos Stavrakakis | # provided with the distribution.
|
15 | db400d82 | Christos Stavrakakis | #
|
16 | db400d82 | Christos Stavrakakis | # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
|
17 | db400d82 | Christos Stavrakakis | # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
|
18 | db400d82 | Christos Stavrakakis | # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
|
19 | db400d82 | Christos Stavrakakis | # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
|
20 | db400d82 | Christos Stavrakakis | # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
|
21 | db400d82 | Christos Stavrakakis | # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
|
22 | db400d82 | Christos Stavrakakis | # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
|
23 | db400d82 | Christos Stavrakakis | # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
|
24 | db400d82 | Christos Stavrakakis | # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
|
25 | db400d82 | Christos Stavrakakis | # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
|
26 | db400d82 | Christos Stavrakakis | # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
|
27 | db400d82 | Christos Stavrakakis | # POSSIBILITY OF SUCH DAMAGE.
|
28 | db400d82 | Christos Stavrakakis | #
|
29 | db400d82 | Christos Stavrakakis | # The views and conclusions contained in the software and
|
30 | db400d82 | Christos Stavrakakis | # documentation are those of the authors and should not be
|
31 | db400d82 | Christos Stavrakakis | # interpreted as representing official policies, either expressed
|
32 | db400d82 | Christos Stavrakakis | # or implied, of GRNET S.A.
|
33 | db400d82 | Christos Stavrakakis | |
34 | db400d82 | Christos Stavrakakis | """ Module implementing connection and communication with an AMQP broker.
|
35 | db400d82 | Christos Stavrakakis |
|
36 | db400d82 | Christos Stavrakakis | AMQP Client's implemented by this module silenty detect connection failures and
|
37 | db400d82 | Christos Stavrakakis | try to reconnect to any available broker. Also publishing takes advantage of
|
38 | db400d82 | Christos Stavrakakis | publisher-confirms in order to guarantee that messages are properly delivered
|
39 | db400d82 | Christos Stavrakakis | to the broker.
|
40 | db400d82 | Christos Stavrakakis |
|
41 | db400d82 | Christos Stavrakakis | """
|
42 | db400d82 | Christos Stavrakakis | |
43 | db400d82 | Christos Stavrakakis | import logging |
44 | db400d82 | Christos Stavrakakis | |
45 | db400d82 | Christos Stavrakakis | from puka import Client |
46 | db400d82 | Christos Stavrakakis | from puka import spec_exceptions |
47 | db400d82 | Christos Stavrakakis | from socket import error as socket_error |
48 | db400d82 | Christos Stavrakakis | from time import sleep |
49 | db400d82 | Christos Stavrakakis | from random import shuffle |
50 | db400d82 | Christos Stavrakakis | from functools import wraps |
51 | db400d82 | Christos Stavrakakis | from ordereddict import OrderedDict |
52 | db400d82 | Christos Stavrakakis | from synnefo import settings |
53 | db400d82 | Christos Stavrakakis | |
54 | db400d82 | Christos Stavrakakis | logging.basicConfig(level=logging.DEBUG, |
55 | db400d82 | Christos Stavrakakis | format="[%(levelname)s %(asctime)s] %(message)s")
|
56 | db400d82 | Christos Stavrakakis | logger = logging.getLogger() |
57 | db400d82 | Christos Stavrakakis | |
58 | db400d82 | Christos Stavrakakis | |
59 | db400d82 | Christos Stavrakakis | def reconnect_decorator(func): |
60 | db400d82 | Christos Stavrakakis | """
|
61 | db400d82 | Christos Stavrakakis | Decorator for persistent connection with one or more AMQP brokers.
|
62 | db400d82 | Christos Stavrakakis |
|
63 | db400d82 | Christos Stavrakakis | """
|
64 | db400d82 | Christos Stavrakakis | @wraps(func)
|
65 | db400d82 | Christos Stavrakakis | def wrapper(self, *args, **kwargs): |
66 | db400d82 | Christos Stavrakakis | try:
|
67 | e0b68525 | Christos Stavrakakis | return func(self, *args, **kwargs) |
68 | db400d82 | Christos Stavrakakis | except (socket_error, spec_exceptions.ConnectionForced) as e: |
69 | db400d82 | Christos Stavrakakis | logger.error('Connection Closed while in %s: %s', func.__name__, e)
|
70 | db400d82 | Christos Stavrakakis | self.consume_promises = []
|
71 | db400d82 | Christos Stavrakakis | self.connect()
|
72 | db400d82 | Christos Stavrakakis | |
73 | db400d82 | Christos Stavrakakis | return wrapper
|
74 | db400d82 | Christos Stavrakakis | |
75 | db400d82 | Christos Stavrakakis | |
76 | db400d82 | Christos Stavrakakis | class AMQPPukaClient(object): |
77 | db400d82 | Christos Stavrakakis | """
|
78 | db400d82 | Christos Stavrakakis | AMQP generic client implementing most of the basic AMQP operations.
|
79 | db400d82 | Christos Stavrakakis |
|
80 | db400d82 | Christos Stavrakakis | """
|
81 | db400d82 | Christos Stavrakakis | def __init__(self, hosts=settings.AMQP_HOSTS, max_retries=30, |
82 | db400d82 | Christos Stavrakakis | confirms=True, confirm_buffer=100): |
83 | db400d82 | Christos Stavrakakis | """Format hosts as "amqp://username:pass@host:port" """
|
84 | db400d82 | Christos Stavrakakis | |
85 | db400d82 | Christos Stavrakakis | self.hosts = hosts
|
86 | db400d82 | Christos Stavrakakis | shuffle(self.hosts)
|
87 | db400d82 | Christos Stavrakakis | |
88 | db400d82 | Christos Stavrakakis | self.max_retries = max_retries
|
89 | db400d82 | Christos Stavrakakis | self.confirms = confirms
|
90 | db400d82 | Christos Stavrakakis | self.confirm_buffer = confirm_buffer
|
91 | db400d82 | Christos Stavrakakis | |
92 | db400d82 | Christos Stavrakakis | self.connection = None |
93 | db400d82 | Christos Stavrakakis | self.channel = None |
94 | db400d82 | Christos Stavrakakis | self.consumers = {}
|
95 | db400d82 | Christos Stavrakakis | self.unacked = OrderedDict()
|
96 | db400d82 | Christos Stavrakakis | self.unsend = OrderedDict()
|
97 | db400d82 | Christos Stavrakakis | self.consume_promises = []
|
98 | 25649e21 | Christos Stavrakakis | self.exchanges = []
|
99 | db400d82 | Christos Stavrakakis | |
100 | db400d82 | Christos Stavrakakis | def connect(self, retries=0): |
101 | db400d82 | Christos Stavrakakis | if retries > self.max_retries: |
102 | db400d82 | Christos Stavrakakis | logger.error("Aborting after %s retries", retries - 1) |
103 | db400d82 | Christos Stavrakakis | raise AMQPConnectionError('Aborting after %d connection failures.'\ |
104 | db400d82 | Christos Stavrakakis | % (retries - 1))
|
105 | db400d82 | Christos Stavrakakis | return
|
106 | db400d82 | Christos Stavrakakis | |
107 | db400d82 | Christos Stavrakakis | # Pick up a host
|
108 | db400d82 | Christos Stavrakakis | host = self.hosts.pop()
|
109 | db400d82 | Christos Stavrakakis | self.hosts.insert(0, host) |
110 | db400d82 | Christos Stavrakakis | |
111 | db400d82 | Christos Stavrakakis | self.client = Client(host, pubacks=self.confirms) |
112 | db400d82 | Christos Stavrakakis | |
113 | db400d82 | Christos Stavrakakis | host = host.split('@')[-1] |
114 | db400d82 | Christos Stavrakakis | logger.debug('Connecting to node %s' % host)
|
115 | db400d82 | Christos Stavrakakis | |
116 | db400d82 | Christos Stavrakakis | try:
|
117 | db400d82 | Christos Stavrakakis | promise = self.client.connect()
|
118 | db400d82 | Christos Stavrakakis | self.client.wait(promise)
|
119 | db400d82 | Christos Stavrakakis | except socket_error as e: |
120 | db400d82 | Christos Stavrakakis | logger.error('Cannot connect to host %s: %s', host, e)
|
121 | db400d82 | Christos Stavrakakis | if retries > 2 * len(self.hosts): |
122 | db400d82 | Christos Stavrakakis | sleep(1)
|
123 | db400d82 | Christos Stavrakakis | return self.connect(retries + 1) |
124 | db400d82 | Christos Stavrakakis | |
125 | db400d82 | Christos Stavrakakis | logger.info('Successfully connected to host: %s', host)
|
126 | db400d82 | Christos Stavrakakis | |
127 | db400d82 | Christos Stavrakakis | logger.info('Creating channel')
|
128 | db400d82 | Christos Stavrakakis | |
129 | db400d82 | Christos Stavrakakis | if self.unacked: |
130 | db400d82 | Christos Stavrakakis | self._resend_unacked_messages()
|
131 | db400d82 | Christos Stavrakakis | |
132 | db400d82 | Christos Stavrakakis | if self.unsend: |
133 | db400d82 | Christos Stavrakakis | self._resend_unsend_messages()
|
134 | db400d82 | Christos Stavrakakis | |
135 | db400d82 | Christos Stavrakakis | if self.consumers: |
136 | db400d82 | Christos Stavrakakis | for queue, callback in self.consumers.items(): |
137 | db400d82 | Christos Stavrakakis | self.basic_consume(queue, callback)
|
138 | db400d82 | Christos Stavrakakis | |
139 | 25649e21 | Christos Stavrakakis | if self.exchanges: |
140 | 25649e21 | Christos Stavrakakis | exchanges = self.exchanges
|
141 | 25649e21 | Christos Stavrakakis | self.exchanges = []
|
142 | 25649e21 | Christos Stavrakakis | for exchange, type in exchanges: |
143 | 25649e21 | Christos Stavrakakis | self.exchange_declare(exchange, type) |
144 | 25649e21 | Christos Stavrakakis | |
145 | db400d82 | Christos Stavrakakis | def exchange_declare(self, exchange, type='direct'): |
146 | db400d82 | Christos Stavrakakis | """Declare an exchange
|
147 | db400d82 | Christos Stavrakakis | @type exchange_name: string
|
148 | db400d82 | Christos Stavrakakis | @param exchange_name: name of the exchange
|
149 | db400d82 | Christos Stavrakakis | @type exchange_type: string
|
150 | db400d82 | Christos Stavrakakis | @param exhange_type: one of 'direct', 'topic', 'fanout'
|
151 | db400d82 | Christos Stavrakakis |
|
152 | db400d82 | Christos Stavrakakis | """
|
153 | db400d82 | Christos Stavrakakis | logger.info('Declaring %s exchange: %s', type, exchange) |
154 | db400d82 | Christos Stavrakakis | promise = self.client.exchange_declare(exchange=exchange,
|
155 | db400d82 | Christos Stavrakakis | type=type,
|
156 | db400d82 | Christos Stavrakakis | durable=True,
|
157 | db400d82 | Christos Stavrakakis | auto_delete=False)
|
158 | db400d82 | Christos Stavrakakis | self.client.wait(promise)
|
159 | 25649e21 | Christos Stavrakakis | self.exchanges.append((exchange, type)) |
160 | db400d82 | Christos Stavrakakis | |
161 | db400d82 | Christos Stavrakakis | @reconnect_decorator
|
162 | db400d82 | Christos Stavrakakis | def queue_declare(self, queue, exclusive=False, |
163 | db400d82 | Christos Stavrakakis | mirrored=True, mirrored_nodes='all'): |
164 | db400d82 | Christos Stavrakakis | """Declare a queue
|
165 | db400d82 | Christos Stavrakakis |
|
166 | db400d82 | Christos Stavrakakis | @type queue: string
|
167 | db400d82 | Christos Stavrakakis | @param queue: name of the queue
|
168 | db400d82 | Christos Stavrakakis | @param mirrored: whether the queue will be mirrored to other brokers
|
169 | db400d82 | Christos Stavrakakis | @param mirrored_nodes: the policy for the mirrored queue.
|
170 | db400d82 | Christos Stavrakakis | Available policies:
|
171 | db400d82 | Christos Stavrakakis | - 'all': The queue is mirrored to all nodes and the
|
172 | db400d82 | Christos Stavrakakis | master node is the one to which the client is
|
173 | db400d82 | Christos Stavrakakis | connected
|
174 | db400d82 | Christos Stavrakakis | - a list of nodes. The queue will be mirrored only to
|
175 | db400d82 | Christos Stavrakakis | the specified nodes, and the master will be the
|
176 | db400d82 | Christos Stavrakakis | first node in the list. Node names must be provided
|
177 | db400d82 | Christos Stavrakakis | and not host IP. example: [node1@rabbit,node2@rabbit]
|
178 | db400d82 | Christos Stavrakakis |
|
179 | db400d82 | Christos Stavrakakis | """
|
180 | db400d82 | Christos Stavrakakis | logger.info('Declaring queue: %s', queue)
|
181 | db400d82 | Christos Stavrakakis | |
182 | db400d82 | Christos Stavrakakis | if mirrored:
|
183 | db400d82 | Christos Stavrakakis | if mirrored_nodes == 'all': |
184 | db400d82 | Christos Stavrakakis | arguments = {'x-ha-policy': 'all'} |
185 | db400d82 | Christos Stavrakakis | elif isinstance(mirrored_nodes, list): |
186 | db400d82 | Christos Stavrakakis | arguments = {'x-ha-policy': 'nodes', |
187 | db400d82 | Christos Stavrakakis | 'x-ha-policy-params': mirrored_nodes}
|
188 | db400d82 | Christos Stavrakakis | else:
|
189 | db400d82 | Christos Stavrakakis | raise AttributeError |
190 | db400d82 | Christos Stavrakakis | else:
|
191 | db400d82 | Christos Stavrakakis | arguments = {} |
192 | db400d82 | Christos Stavrakakis | |
193 | db400d82 | Christos Stavrakakis | promise = self.client.queue_declare(queue=queue, durable=True, |
194 | db400d82 | Christos Stavrakakis | exclusive=exclusive, |
195 | db400d82 | Christos Stavrakakis | auto_delete=False,
|
196 | db400d82 | Christos Stavrakakis | arguments=arguments) |
197 | db400d82 | Christos Stavrakakis | self.client.wait(promise)
|
198 | db400d82 | Christos Stavrakakis | |
199 | db400d82 | Christos Stavrakakis | def queue_bind(self, queue, exchange, routing_key): |
200 | db400d82 | Christos Stavrakakis | logger.debug('Binding queue %s to exchange %s with key %s'
|
201 | db400d82 | Christos Stavrakakis | % (queue, exchange, routing_key)) |
202 | db400d82 | Christos Stavrakakis | promise = self.client.queue_bind(exchange=exchange, queue=queue,
|
203 | db400d82 | Christos Stavrakakis | routing_key=routing_key) |
204 | db400d82 | Christos Stavrakakis | self.client.wait(promise)
|
205 | db400d82 | Christos Stavrakakis | |
206 | db400d82 | Christos Stavrakakis | @reconnect_decorator
|
207 | db400d82 | Christos Stavrakakis | def basic_publish(self, exchange, routing_key, body): |
208 | db400d82 | Christos Stavrakakis | """Publish a message with a specific routing key """
|
209 | db400d82 | Christos Stavrakakis | self._publish(exchange, routing_key, body)
|
210 | db400d82 | Christos Stavrakakis | |
211 | db400d82 | Christos Stavrakakis | self.flush_buffer()
|
212 | db400d82 | Christos Stavrakakis | |
213 | db400d82 | Christos Stavrakakis | if self.confirms and len(self.unacked) >= self.confirm_buffer: |
214 | db400d82 | Christos Stavrakakis | self.get_confirms()
|
215 | db400d82 | Christos Stavrakakis | |
216 | db400d82 | Christos Stavrakakis | @reconnect_decorator
|
217 | db400d82 | Christos Stavrakakis | def basic_publish_multi(self, exchange, routing_key, bodies): |
218 | db400d82 | Christos Stavrakakis | for body in bodies: |
219 | db400d82 | Christos Stavrakakis | self.unsend[body] = (exchange, routing_key)
|
220 | db400d82 | Christos Stavrakakis | |
221 | db400d82 | Christos Stavrakakis | for body in bodies: |
222 | db400d82 | Christos Stavrakakis | self._publish(exchange, routing_key, body)
|
223 | db400d82 | Christos Stavrakakis | self.unsend.pop(body)
|
224 | db400d82 | Christos Stavrakakis | |
225 | db400d82 | Christos Stavrakakis | self.flush_buffer()
|
226 | db400d82 | Christos Stavrakakis | |
227 | db400d82 | Christos Stavrakakis | if self.confirms: |
228 | db400d82 | Christos Stavrakakis | self.get_confirms()
|
229 | db400d82 | Christos Stavrakakis | |
230 | db400d82 | Christos Stavrakakis | def _publish(self, exchange, routing_key, body): |
231 | db400d82 | Christos Stavrakakis | # Persisent messages by default!
|
232 | db400d82 | Christos Stavrakakis | headers = {} |
233 | db400d82 | Christos Stavrakakis | headers['delivery_mode'] = 2 |
234 | db400d82 | Christos Stavrakakis | promise = self.client.basic_publish(exchange=exchange,
|
235 | db400d82 | Christos Stavrakakis | routing_key=routing_key, |
236 | db400d82 | Christos Stavrakakis | body=body, headers=headers) |
237 | db400d82 | Christos Stavrakakis | |
238 | db400d82 | Christos Stavrakakis | if self.confirms: |
239 | db400d82 | Christos Stavrakakis | self.unacked[promise] = (exchange, routing_key, body)
|
240 | db400d82 | Christos Stavrakakis | |
241 | db400d82 | Christos Stavrakakis | return promise
|
242 | db400d82 | Christos Stavrakakis | |
243 | db400d82 | Christos Stavrakakis | @reconnect_decorator
|
244 | db400d82 | Christos Stavrakakis | def flush_buffer(self): |
245 | db400d82 | Christos Stavrakakis | while self.client.needs_write(): |
246 | db400d82 | Christos Stavrakakis | self.client.on_write()
|
247 | db400d82 | Christos Stavrakakis | |
248 | db400d82 | Christos Stavrakakis | @reconnect_decorator
|
249 | db400d82 | Christos Stavrakakis | def get_confirms(self): |
250 | db400d82 | Christos Stavrakakis | for promise in self.unacked.keys(): |
251 | db400d82 | Christos Stavrakakis | self.client.wait(promise)
|
252 | db400d82 | Christos Stavrakakis | self.unacked.pop(promise)
|
253 | db400d82 | Christos Stavrakakis | |
254 | db400d82 | Christos Stavrakakis | @reconnect_decorator
|
255 | db400d82 | Christos Stavrakakis | def _resend_unacked_messages(self): |
256 | db400d82 | Christos Stavrakakis | """Resend unacked messages in case of a connection failure."""
|
257 | db400d82 | Christos Stavrakakis | msgs = self.unacked.values()
|
258 | db400d82 | Christos Stavrakakis | self.unacked.clear()
|
259 | db400d82 | Christos Stavrakakis | for exchange, routing_key, body in msgs: |
260 | db400d82 | Christos Stavrakakis | logger.debug('Resending message %s' % body)
|
261 | db400d82 | Christos Stavrakakis | self.basic_publish(exchange, routing_key, body)
|
262 | db400d82 | Christos Stavrakakis | |
263 | db400d82 | Christos Stavrakakis | @reconnect_decorator
|
264 | db400d82 | Christos Stavrakakis | def _resend_unsend_messages(self): |
265 | db400d82 | Christos Stavrakakis | """Resend unsend messages in case of a connection failure."""
|
266 | db400d82 | Christos Stavrakakis | for body in self.unsend.keys(): |
267 | db400d82 | Christos Stavrakakis | (exchange, routing_key) = self.unsend[body]
|
268 | db400d82 | Christos Stavrakakis | self.basic_publish(exchange, routing_key, body)
|
269 | db400d82 | Christos Stavrakakis | self.unsend.pop(body)
|
270 | db400d82 | Christos Stavrakakis | |
271 | db400d82 | Christos Stavrakakis | @reconnect_decorator
|
272 | db400d82 | Christos Stavrakakis | def basic_consume(self, queue, callback, prefetch_count=0): |
273 | db400d82 | Christos Stavrakakis | """Consume from a queue.
|
274 | db400d82 | Christos Stavrakakis |
|
275 | db400d82 | Christos Stavrakakis | @type queue: string or list of strings
|
276 | db400d82 | Christos Stavrakakis | @param queue: the name or list of names from the queues to consume
|
277 | db400d82 | Christos Stavrakakis | @type callback: function
|
278 | db400d82 | Christos Stavrakakis | @param callback: the callback function to run when a message arrives
|
279 | db400d82 | Christos Stavrakakis |
|
280 | db400d82 | Christos Stavrakakis | """
|
281 | db400d82 | Christos Stavrakakis | # Store the queues and the callback
|
282 | db400d82 | Christos Stavrakakis | self.consumers[queue] = callback
|
283 | db400d82 | Christos Stavrakakis | |
284 | db400d82 | Christos Stavrakakis | def handle_delivery(promise, msg): |
285 | db400d82 | Christos Stavrakakis | """Hide promises and messages without body"""
|
286 | db400d82 | Christos Stavrakakis | if 'body' in msg: |
287 | db400d82 | Christos Stavrakakis | callback(self, msg)
|
288 | db400d82 | Christos Stavrakakis | else:
|
289 | db400d82 | Christos Stavrakakis | logger.debug("Message without body %s" % msg)
|
290 | db400d82 | Christos Stavrakakis | raise socket_error
|
291 | db400d82 | Christos Stavrakakis | |
292 | db400d82 | Christos Stavrakakis | consume_promise = \ |
293 | db400d82 | Christos Stavrakakis | self.client.basic_consume(queue=queue,
|
294 | db400d82 | Christos Stavrakakis | prefetch_count=prefetch_count, |
295 | db400d82 | Christos Stavrakakis | callback=handle_delivery) |
296 | db400d82 | Christos Stavrakakis | |
297 | db400d82 | Christos Stavrakakis | self.consume_promises.append(consume_promise)
|
298 | db400d82 | Christos Stavrakakis | return consume_promise
|
299 | db400d82 | Christos Stavrakakis | |
300 | db400d82 | Christos Stavrakakis | @reconnect_decorator
|
301 | db400d82 | Christos Stavrakakis | def basic_wait(self, promise=None, timeout=0): |
302 | db400d82 | Christos Stavrakakis | """Wait for messages from the queues declared by basic_consume.
|
303 | db400d82 | Christos Stavrakakis |
|
304 | db400d82 | Christos Stavrakakis | This function will block until a message arrives from the queues that
|
305 | db400d82 | Christos Stavrakakis | have been declared with basic_consume. If the optional arguments
|
306 | db400d82 | Christos Stavrakakis | 'promise' is given, only messages for this promise will be delivered.
|
307 | db400d82 | Christos Stavrakakis |
|
308 | db400d82 | Christos Stavrakakis | """
|
309 | db400d82 | Christos Stavrakakis | if promise is not None: |
310 | db400d82 | Christos Stavrakakis | self.client.wait(promise, timeout)
|
311 | db400d82 | Christos Stavrakakis | else:
|
312 | db400d82 | Christos Stavrakakis | self.client.wait(self.consume_promises) |
313 | db400d82 | Christos Stavrakakis | |
314 | db400d82 | Christos Stavrakakis | @reconnect_decorator
|
315 | db400d82 | Christos Stavrakakis | def basic_get(self, queue): |
316 | db400d82 | Christos Stavrakakis | """Get a single message from a queue.
|
317 | db400d82 | Christos Stavrakakis |
|
318 | db400d82 | Christos Stavrakakis | This is a non-blocking method for getting messages from a queue.
|
319 | db400d82 | Christos Stavrakakis | It will return None if the queue is empty.
|
320 | db400d82 | Christos Stavrakakis |
|
321 | db400d82 | Christos Stavrakakis | """
|
322 | db400d82 | Christos Stavrakakis | get_promise = self.client.basic_get(queue=queue)
|
323 | db400d82 | Christos Stavrakakis | result = self.client.wait(get_promise)
|
324 | db400d82 | Christos Stavrakakis | if 'empty' in result: |
325 | db400d82 | Christos Stavrakakis | # The queue is empty
|
326 | db400d82 | Christos Stavrakakis | return None |
327 | db400d82 | Christos Stavrakakis | else:
|
328 | db400d82 | Christos Stavrakakis | return result
|
329 | db400d82 | Christos Stavrakakis | |
330 | db400d82 | Christos Stavrakakis | @reconnect_decorator
|
331 | db400d82 | Christos Stavrakakis | def basic_ack(self, message): |
332 | db400d82 | Christos Stavrakakis | self.client.basic_ack(message)
|
333 | db400d82 | Christos Stavrakakis | |
334 | db400d82 | Christos Stavrakakis | @reconnect_decorator
|
335 | db400d82 | Christos Stavrakakis | def basic_nack(self, message): |
336 | db400d82 | Christos Stavrakakis | #TODO:
|
337 | db400d82 | Christos Stavrakakis | pass
|
338 | db400d82 | Christos Stavrakakis | |
339 | db400d82 | Christos Stavrakakis | def close(self): |
340 | db400d82 | Christos Stavrakakis | """Check that messages have been send and close the connection."""
|
341 | db400d82 | Christos Stavrakakis | try:
|
342 | db400d82 | Christos Stavrakakis | if self.confirms: |
343 | db400d82 | Christos Stavrakakis | self.get_confirms()
|
344 | db400d82 | Christos Stavrakakis | close_promise = self.client.close()
|
345 | db400d82 | Christos Stavrakakis | self.client.wait(close_promise)
|
346 | db400d82 | Christos Stavrakakis | except (socket_error, spec_exceptions.ConnectionForced) as e: |
347 | db400d82 | Christos Stavrakakis | logger.error('Connection closed while closing connection:%s',
|
348 | db400d82 | Christos Stavrakakis | e) |
349 | db400d82 | Christos Stavrakakis | |
350 | db400d82 | Christos Stavrakakis | def queue_delete(self, queue, if_unused=True, if_empty=True): |
351 | db400d82 | Christos Stavrakakis | """Delete a queue.
|
352 | db400d82 | Christos Stavrakakis |
|
353 | db400d82 | Christos Stavrakakis | Returns False if the queue does not exist
|
354 | db400d82 | Christos Stavrakakis | """
|
355 | db400d82 | Christos Stavrakakis | try:
|
356 | db400d82 | Christos Stavrakakis | promise = self.client.queue_delete(queue=queue,
|
357 | db400d82 | Christos Stavrakakis | if_unused=if_unused, |
358 | db400d82 | Christos Stavrakakis | if_empty=if_empty) |
359 | db400d82 | Christos Stavrakakis | self.client.wait(promise)
|
360 | db400d82 | Christos Stavrakakis | return True |
361 | db400d82 | Christos Stavrakakis | except spec_exceptions.NotFound:
|
362 | db400d82 | Christos Stavrakakis | logger.info("Queue %s does not exist", queue)
|
363 | db400d82 | Christos Stavrakakis | return False |
364 | db400d82 | Christos Stavrakakis | |
365 | db400d82 | Christos Stavrakakis | def exchange_delete(self, exchange, if_unused=True): |
366 | db400d82 | Christos Stavrakakis | """Delete an exchange."""
|
367 | db400d82 | Christos Stavrakakis | try:
|
368 | db400d82 | Christos Stavrakakis | |
369 | db400d82 | Christos Stavrakakis | promise = self.client.exchange_delete(exchange=exchange,
|
370 | db400d82 | Christos Stavrakakis | if_unused=if_unused) |
371 | db400d82 | Christos Stavrakakis | self.client.wait(promise)
|
372 | db400d82 | Christos Stavrakakis | return True |
373 | db400d82 | Christos Stavrakakis | except spec_exceptions.NotFound:
|
374 | db400d82 | Christos Stavrakakis | logger.info("Exchange %s does not exist", exchange)
|
375 | db400d82 | Christos Stavrakakis | return False |
376 | db400d82 | Christos Stavrakakis | |
377 | db400d82 | Christos Stavrakakis | @reconnect_decorator
|
378 | db400d82 | Christos Stavrakakis | def basic_cancel(self, promise=None): |
379 | db400d82 | Christos Stavrakakis | """Cancel consuming from a queue. """
|
380 | db400d82 | Christos Stavrakakis | if promise is not None: |
381 | db400d82 | Christos Stavrakakis | self.client.basic_cancel(promise)
|
382 | db400d82 | Christos Stavrakakis | else:
|
383 | db400d82 | Christos Stavrakakis | for promise in self.consume_promises: |
384 | db400d82 | Christos Stavrakakis | self.client.basic_cancel(promise)
|
385 | db400d82 | Christos Stavrakakis | |
386 | db400d82 | Christos Stavrakakis | |
387 | db400d82 | Christos Stavrakakis | class AMQPConnectionError(): |
388 | db400d82 | Christos Stavrakakis | pass |