Statistics
| Branch: | Tag: | Revision:

root / snf-common / synnefo / lib / amqp / amqp_puka.py @ 2e26966f

History | View | Annotate | Download (14.9 kB)

1 db400d82 Christos Stavrakakis
# Copyright 2012 GRNET S.A. All rights reserved.
2 db400d82 Christos Stavrakakis
#
3 db400d82 Christos Stavrakakis
# Redistribution and use in source and binary forms, with or
4 db400d82 Christos Stavrakakis
# without modification, are permitted provided that the following
5 db400d82 Christos Stavrakakis
# conditions are met:
6 db400d82 Christos Stavrakakis
#
7 db400d82 Christos Stavrakakis
#   1. Redistributions of source code must retain the above
8 db400d82 Christos Stavrakakis
#      copyright notice, this list of conditions and the following
9 db400d82 Christos Stavrakakis
#      disclaimer.
10 db400d82 Christos Stavrakakis
#
11 db400d82 Christos Stavrakakis
#   2. Redistributions in binary form must reproduce the above
12 db400d82 Christos Stavrakakis
#      copyright notice, this list of conditions and the following
13 db400d82 Christos Stavrakakis
#      disclaimer in the documentation and/or other materials
14 db400d82 Christos Stavrakakis
#      provided with the distribution.
15 db400d82 Christos Stavrakakis
#
16 db400d82 Christos Stavrakakis
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 db400d82 Christos Stavrakakis
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 db400d82 Christos Stavrakakis
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 db400d82 Christos Stavrakakis
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 db400d82 Christos Stavrakakis
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 db400d82 Christos Stavrakakis
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 db400d82 Christos Stavrakakis
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 db400d82 Christos Stavrakakis
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 db400d82 Christos Stavrakakis
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 db400d82 Christos Stavrakakis
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 db400d82 Christos Stavrakakis
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 db400d82 Christos Stavrakakis
# POSSIBILITY OF SUCH DAMAGE.
28 db400d82 Christos Stavrakakis
#
29 db400d82 Christos Stavrakakis
# The views and conclusions contained in the software and
30 db400d82 Christos Stavrakakis
# documentation are those of the authors and should not be
31 db400d82 Christos Stavrakakis
# interpreted as representing official policies, either expressed
32 db400d82 Christos Stavrakakis
# or implied, of GRNET S.A.
33 db400d82 Christos Stavrakakis
34 db400d82 Christos Stavrakakis
""" Module implementing connection and communication with an AMQP broker.
35 db400d82 Christos Stavrakakis

36 db400d82 Christos Stavrakakis
AMQP Client's implemented by this module silenty detect connection failures and
37 db400d82 Christos Stavrakakis
try to reconnect to any available broker. Also publishing takes advantage of
38 db400d82 Christos Stavrakakis
publisher-confirms in order to guarantee that messages are properly delivered
39 db400d82 Christos Stavrakakis
to the broker.
40 db400d82 Christos Stavrakakis

41 db400d82 Christos Stavrakakis
"""
42 db400d82 Christos Stavrakakis
43 db400d82 Christos Stavrakakis
import logging
44 db400d82 Christos Stavrakakis
45 db400d82 Christos Stavrakakis
from puka import Client
46 db400d82 Christos Stavrakakis
from puka import spec_exceptions
47 6d27eadd Christos Stavrakakis
import socket
48 db400d82 Christos Stavrakakis
from socket import error as socket_error
49 db400d82 Christos Stavrakakis
from time import sleep
50 db400d82 Christos Stavrakakis
from random import shuffle
51 db400d82 Christos Stavrakakis
from functools import wraps
52 7627ec6f Christos Stavrakakis
from synnefo.lib.ordereddict import OrderedDict
53 db400d82 Christos Stavrakakis
from synnefo import settings
54 db400d82 Christos Stavrakakis
55 db400d82 Christos Stavrakakis
56 db400d82 Christos Stavrakakis
def reconnect_decorator(func):
57 db400d82 Christos Stavrakakis
    """
58 db400d82 Christos Stavrakakis
    Decorator for persistent connection with one or more AMQP brokers.
59 db400d82 Christos Stavrakakis

60 db400d82 Christos Stavrakakis
    """
61 db400d82 Christos Stavrakakis
    @wraps(func)
62 db400d82 Christos Stavrakakis
    def wrapper(self, *args, **kwargs):
63 db400d82 Christos Stavrakakis
        try:
64 e0b68525 Christos Stavrakakis
            return func(self, *args, **kwargs)
65 db400d82 Christos Stavrakakis
        except (socket_error, spec_exceptions.ConnectionForced) as e:
66 74d988b0 Christos Stavrakakis
            self.log.error('Connection Closed while in %s: %s', func.__name__,
67 74d988b0 Christos Stavrakakis
                           e)
68 db400d82 Christos Stavrakakis
            self.connect()
69 db400d82 Christos Stavrakakis
70 db400d82 Christos Stavrakakis
    return wrapper
71 db400d82 Christos Stavrakakis
72 db400d82 Christos Stavrakakis
73 db400d82 Christos Stavrakakis
class AMQPPukaClient(object):
74 db400d82 Christos Stavrakakis
    """
75 db400d82 Christos Stavrakakis
    AMQP generic client implementing most of the basic AMQP operations.
76 db400d82 Christos Stavrakakis

77 db400d82 Christos Stavrakakis
    """
78 db400d82 Christos Stavrakakis
    def __init__(self, hosts=settings.AMQP_HOSTS, max_retries=30,
79 a8858945 Christos Stavrakakis
                 confirms=True, confirm_buffer=100, logger=None):
80 2ef10562 Christos Stavrakakis
        """
81 2ef10562 Christos Stavrakakis
        Format hosts as "amqp://username:pass@host:port"
82 2ef10562 Christos Stavrakakis
        max_retries=0 defaults to unlimited retries
83 2ef10562 Christos Stavrakakis

84 2ef10562 Christos Stavrakakis
        """
85 db400d82 Christos Stavrakakis
86 db400d82 Christos Stavrakakis
        self.hosts = hosts
87 db400d82 Christos Stavrakakis
        shuffle(self.hosts)
88 db400d82 Christos Stavrakakis
89 db400d82 Christos Stavrakakis
        self.max_retries = max_retries
90 db400d82 Christos Stavrakakis
        self.confirms = confirms
91 db400d82 Christos Stavrakakis
        self.confirm_buffer = confirm_buffer
92 db400d82 Christos Stavrakakis
93 db400d82 Christos Stavrakakis
        self.connection = None
94 db400d82 Christos Stavrakakis
        self.channel = None
95 db400d82 Christos Stavrakakis
        self.consumers = {}
96 db400d82 Christos Stavrakakis
        self.unacked = OrderedDict()
97 db400d82 Christos Stavrakakis
        self.unsend = OrderedDict()
98 db400d82 Christos Stavrakakis
        self.consume_promises = []
99 25649e21 Christos Stavrakakis
        self.exchanges = []
100 a8858945 Christos Stavrakakis
        if logger:
101 a8858945 Christos Stavrakakis
            self.log = logger
102 a8858945 Christos Stavrakakis
        else:
103 a8858945 Christos Stavrakakis
            logger = logging.getLogger("amqp")
104 a8858945 Christos Stavrakakis
            logging.basicConfig()
105 a8858945 Christos Stavrakakis
            self.log = logger
106 db400d82 Christos Stavrakakis
107 db400d82 Christos Stavrakakis
    def connect(self, retries=0):
108 2ef10562 Christos Stavrakakis
        if self.max_retries and retries >= self.max_retries:
109 a8858945 Christos Stavrakakis
            self.log.error("Aborting after %d retries", retries)
110 74d988b0 Christos Stavrakakis
            raise AMQPConnectionError('Aborting after %d connection failures.'
111 2ef10562 Christos Stavrakakis
                                      % retries)
112 db400d82 Christos Stavrakakis
            return
113 db400d82 Christos Stavrakakis
114 db400d82 Christos Stavrakakis
        # Pick up a host
115 db400d82 Christos Stavrakakis
        host = self.hosts.pop()
116 db400d82 Christos Stavrakakis
        self.hosts.insert(0, host)
117 db400d82 Christos Stavrakakis
118 db400d82 Christos Stavrakakis
        self.client = Client(host, pubacks=self.confirms)
119 db400d82 Christos Stavrakakis
120 db400d82 Christos Stavrakakis
        host = host.split('@')[-1]
121 a8858945 Christos Stavrakakis
        self.log.debug('Connecting to node %s' % host)
122 db400d82 Christos Stavrakakis
123 db400d82 Christos Stavrakakis
        try:
124 db400d82 Christos Stavrakakis
            promise = self.client.connect()
125 db400d82 Christos Stavrakakis
            self.client.wait(promise)
126 db400d82 Christos Stavrakakis
        except socket_error as e:
127 2ef10562 Christos Stavrakakis
            if retries < len(self.hosts):
128 a8858945 Christos Stavrakakis
                self.log.warning('Cannot connect to host %s: %s', host, e)
129 2ef10562 Christos Stavrakakis
            else:
130 a8858945 Christos Stavrakakis
                self.log.error('Cannot connect to host %s: %s', host, e)
131 db400d82 Christos Stavrakakis
                sleep(1)
132 db400d82 Christos Stavrakakis
            return self.connect(retries + 1)
133 db400d82 Christos Stavrakakis
134 a8858945 Christos Stavrakakis
        self.log.info('Successfully connected to host: %s', host)
135 db400d82 Christos Stavrakakis
136 6d27eadd Christos Stavrakakis
        # Setup TCP keepalive option
137 6d27eadd Christos Stavrakakis
        self.client.sd.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
138 6d27eadd Christos Stavrakakis
        # Keepalive time
139 6d27eadd Christos Stavrakakis
        self.client.sd.setsockopt(socket.SOL_TCP, socket.TCP_KEEPIDLE, 20)
140 6d27eadd Christos Stavrakakis
        # Keepalive interval
141 6d27eadd Christos Stavrakakis
        self.client.sd.setsockopt(socket.SOL_TCP, socket.TCP_KEEPINTVL, 2)
142 6d27eadd Christos Stavrakakis
        # Keepalive retry
143 6d27eadd Christos Stavrakakis
        self.client.sd.setsockopt(socket.SOL_TCP, socket.TCP_KEEPCNT, 10)
144 6d27eadd Christos Stavrakakis
145 a8858945 Christos Stavrakakis
        self.log.info('Creating channel')
146 db400d82 Christos Stavrakakis
147 b1bb9251 Christos Stavrakakis
        # Clear consume_promises each time connecting, since they are related
148 b1bb9251 Christos Stavrakakis
        # to the connection object
149 b1bb9251 Christos Stavrakakis
        self.consume_promises = []
150 b1bb9251 Christos Stavrakakis
151 db400d82 Christos Stavrakakis
        if self.unacked:
152 db400d82 Christos Stavrakakis
            self._resend_unacked_messages()
153 db400d82 Christos Stavrakakis
154 db400d82 Christos Stavrakakis
        if self.unsend:
155 db400d82 Christos Stavrakakis
            self._resend_unsend_messages()
156 db400d82 Christos Stavrakakis
157 db400d82 Christos Stavrakakis
        if self.consumers:
158 db400d82 Christos Stavrakakis
            for queue, callback in self.consumers.items():
159 db400d82 Christos Stavrakakis
                self.basic_consume(queue, callback)
160 db400d82 Christos Stavrakakis
161 25649e21 Christos Stavrakakis
        if self.exchanges:
162 25649e21 Christos Stavrakakis
            exchanges = self.exchanges
163 25649e21 Christos Stavrakakis
            self.exchanges = []
164 25649e21 Christos Stavrakakis
            for exchange, type in exchanges:
165 25649e21 Christos Stavrakakis
                self.exchange_declare(exchange, type)
166 25649e21 Christos Stavrakakis
167 b1bb9251 Christos Stavrakakis
    @reconnect_decorator
168 b1bb9251 Christos Stavrakakis
    def reconnect(self):
169 b1bb9251 Christos Stavrakakis
        self.close()
170 b1bb9251 Christos Stavrakakis
        self.connect()
171 b1bb9251 Christos Stavrakakis
172 db400d82 Christos Stavrakakis
    def exchange_declare(self, exchange, type='direct'):
173 db400d82 Christos Stavrakakis
        """Declare an exchange
174 db400d82 Christos Stavrakakis
        @type exchange_name: string
175 db400d82 Christos Stavrakakis
        @param exchange_name: name of the exchange
176 db400d82 Christos Stavrakakis
        @type exchange_type: string
177 db400d82 Christos Stavrakakis
        @param exhange_type: one of 'direct', 'topic', 'fanout'
178 db400d82 Christos Stavrakakis

179 db400d82 Christos Stavrakakis
        """
180 a8858945 Christos Stavrakakis
        self.log.info('Declaring %s exchange: %s', type, exchange)
181 db400d82 Christos Stavrakakis
        promise = self.client.exchange_declare(exchange=exchange,
182 db400d82 Christos Stavrakakis
                                               type=type,
183 db400d82 Christos Stavrakakis
                                               durable=True,
184 db400d82 Christos Stavrakakis
                                               auto_delete=False)
185 db400d82 Christos Stavrakakis
        self.client.wait(promise)
186 25649e21 Christos Stavrakakis
        self.exchanges.append((exchange, type))
187 db400d82 Christos Stavrakakis
188 db400d82 Christos Stavrakakis
    @reconnect_decorator
189 db400d82 Christos Stavrakakis
    def queue_declare(self, queue, exclusive=False,
190 147c3d12 Christos Stavrakakis
                      mirrored=True, mirrored_nodes='all',
191 147c3d12 Christos Stavrakakis
                      dead_letter_exchange=None):
192 db400d82 Christos Stavrakakis
        """Declare a queue
193 db400d82 Christos Stavrakakis

194 db400d82 Christos Stavrakakis
        @type queue: string
195 db400d82 Christos Stavrakakis
        @param queue: name of the queue
196 db400d82 Christos Stavrakakis
        @param mirrored: whether the queue will be mirrored to other brokers
197 db400d82 Christos Stavrakakis
        @param mirrored_nodes: the policy for the mirrored queue.
198 db400d82 Christos Stavrakakis
            Available policies:
199 db400d82 Christos Stavrakakis
                - 'all': The queue is mirrored to all nodes and the
200 db400d82 Christos Stavrakakis
                  master node is the one to which the client is
201 db400d82 Christos Stavrakakis
                  connected
202 db400d82 Christos Stavrakakis
                - a list of nodes. The queue will be mirrored only to
203 db400d82 Christos Stavrakakis
                  the specified nodes, and the master will be the
204 db400d82 Christos Stavrakakis
                  first node in the list. Node names must be provided
205 db400d82 Christos Stavrakakis
                  and not host IP. example: [node1@rabbit,node2@rabbit]
206 db400d82 Christos Stavrakakis

207 db400d82 Christos Stavrakakis
        """
208 a8858945 Christos Stavrakakis
        self.log.info('Declaring queue: %s', queue)
209 db400d82 Christos Stavrakakis
210 db400d82 Christos Stavrakakis
        if mirrored:
211 db400d82 Christos Stavrakakis
            if mirrored_nodes == 'all':
212 db400d82 Christos Stavrakakis
                arguments = {'x-ha-policy': 'all'}
213 db400d82 Christos Stavrakakis
            elif isinstance(mirrored_nodes, list):
214 db400d82 Christos Stavrakakis
                arguments = {'x-ha-policy': 'nodes',
215 74d988b0 Christos Stavrakakis
                             'x-ha-policy-params': mirrored_nodes}
216 db400d82 Christos Stavrakakis
            else:
217 db400d82 Christos Stavrakakis
                raise AttributeError
218 db400d82 Christos Stavrakakis
        else:
219 db400d82 Christos Stavrakakis
            arguments = {}
220 db400d82 Christos Stavrakakis
221 147c3d12 Christos Stavrakakis
        if dead_letter_exchange:
222 147c3d12 Christos Stavrakakis
            arguments['x-dead-letter-exchange'] = dead_letter_exchange
223 147c3d12 Christos Stavrakakis
224 db400d82 Christos Stavrakakis
        promise = self.client.queue_declare(queue=queue, durable=True,
225 db400d82 Christos Stavrakakis
                                            exclusive=exclusive,
226 db400d82 Christos Stavrakakis
                                            auto_delete=False,
227 db400d82 Christos Stavrakakis
                                            arguments=arguments)
228 db400d82 Christos Stavrakakis
        self.client.wait(promise)
229 db400d82 Christos Stavrakakis
230 db400d82 Christos Stavrakakis
    def queue_bind(self, queue, exchange, routing_key):
231 a8858945 Christos Stavrakakis
        self.log.debug('Binding queue %s to exchange %s with key %s'
232 a8858945 Christos Stavrakakis
                       % (queue, exchange, routing_key))
233 db400d82 Christos Stavrakakis
        promise = self.client.queue_bind(exchange=exchange, queue=queue,
234 db400d82 Christos Stavrakakis
                                         routing_key=routing_key)
235 db400d82 Christos Stavrakakis
        self.client.wait(promise)
236 db400d82 Christos Stavrakakis
237 db400d82 Christos Stavrakakis
    @reconnect_decorator
238 147c3d12 Christos Stavrakakis
    def basic_publish(self, exchange, routing_key, body, headers={}):
239 db400d82 Christos Stavrakakis
        """Publish a message with a specific routing key """
240 147c3d12 Christos Stavrakakis
        self._publish(exchange, routing_key, body, headers)
241 db400d82 Christos Stavrakakis
242 db400d82 Christos Stavrakakis
        self.flush_buffer()
243 db400d82 Christos Stavrakakis
244 db400d82 Christos Stavrakakis
        if self.confirms and len(self.unacked) >= self.confirm_buffer:
245 db400d82 Christos Stavrakakis
            self.get_confirms()
246 db400d82 Christos Stavrakakis
247 db400d82 Christos Stavrakakis
    @reconnect_decorator
248 db400d82 Christos Stavrakakis
    def basic_publish_multi(self, exchange, routing_key, bodies):
249 db400d82 Christos Stavrakakis
        for body in bodies:
250 db400d82 Christos Stavrakakis
            self.unsend[body] = (exchange, routing_key)
251 db400d82 Christos Stavrakakis
252 db400d82 Christos Stavrakakis
        for body in bodies:
253 db400d82 Christos Stavrakakis
            self._publish(exchange, routing_key, body)
254 db400d82 Christos Stavrakakis
            self.unsend.pop(body)
255 db400d82 Christos Stavrakakis
256 db400d82 Christos Stavrakakis
        self.flush_buffer()
257 db400d82 Christos Stavrakakis
258 db400d82 Christos Stavrakakis
        if self.confirms:
259 db400d82 Christos Stavrakakis
            self.get_confirms()
260 db400d82 Christos Stavrakakis
261 147c3d12 Christos Stavrakakis
    def _publish(self, exchange, routing_key, body, headers={}):
262 db400d82 Christos Stavrakakis
        # Persisent messages by default!
263 db400d82 Christos Stavrakakis
        headers['delivery_mode'] = 2
264 db400d82 Christos Stavrakakis
        promise = self.client.basic_publish(exchange=exchange,
265 db400d82 Christos Stavrakakis
                                            routing_key=routing_key,
266 db400d82 Christos Stavrakakis
                                            body=body, headers=headers)
267 db400d82 Christos Stavrakakis
268 db400d82 Christos Stavrakakis
        if self.confirms:
269 db400d82 Christos Stavrakakis
            self.unacked[promise] = (exchange, routing_key, body)
270 db400d82 Christos Stavrakakis
271 db400d82 Christos Stavrakakis
        return promise
272 db400d82 Christos Stavrakakis
273 db400d82 Christos Stavrakakis
    @reconnect_decorator
274 db400d82 Christos Stavrakakis
    def flush_buffer(self):
275 db400d82 Christos Stavrakakis
        while self.client.needs_write():
276 db400d82 Christos Stavrakakis
            self.client.on_write()
277 db400d82 Christos Stavrakakis
278 db400d82 Christos Stavrakakis
    @reconnect_decorator
279 db400d82 Christos Stavrakakis
    def get_confirms(self):
280 db400d82 Christos Stavrakakis
        for promise in self.unacked.keys():
281 db400d82 Christos Stavrakakis
            self.client.wait(promise)
282 db400d82 Christos Stavrakakis
            self.unacked.pop(promise)
283 db400d82 Christos Stavrakakis
284 db400d82 Christos Stavrakakis
    @reconnect_decorator
285 db400d82 Christos Stavrakakis
    def _resend_unacked_messages(self):
286 db400d82 Christos Stavrakakis
        """Resend unacked messages in case of a connection failure."""
287 db400d82 Christos Stavrakakis
        msgs = self.unacked.values()
288 db400d82 Christos Stavrakakis
        self.unacked.clear()
289 db400d82 Christos Stavrakakis
        for exchange, routing_key, body in msgs:
290 a8858945 Christos Stavrakakis
            self.log.debug('Resending message %s' % body)
291 db400d82 Christos Stavrakakis
            self.basic_publish(exchange, routing_key, body)
292 db400d82 Christos Stavrakakis
293 db400d82 Christos Stavrakakis
    @reconnect_decorator
294 db400d82 Christos Stavrakakis
    def _resend_unsend_messages(self):
295 db400d82 Christos Stavrakakis
        """Resend unsend messages in case of a connection failure."""
296 db400d82 Christos Stavrakakis
        for body in self.unsend.keys():
297 db400d82 Christos Stavrakakis
            (exchange, routing_key) = self.unsend[body]
298 db400d82 Christos Stavrakakis
            self.basic_publish(exchange, routing_key, body)
299 db400d82 Christos Stavrakakis
            self.unsend.pop(body)
300 db400d82 Christos Stavrakakis
301 db400d82 Christos Stavrakakis
    @reconnect_decorator
302 db400d82 Christos Stavrakakis
    def basic_consume(self, queue, callback, prefetch_count=0):
303 db400d82 Christos Stavrakakis
        """Consume from a queue.
304 db400d82 Christos Stavrakakis

305 db400d82 Christos Stavrakakis
        @type queue: string or list of strings
306 db400d82 Christos Stavrakakis
        @param queue: the name or list of names from the queues to consume
307 db400d82 Christos Stavrakakis
        @type callback: function
308 db400d82 Christos Stavrakakis
        @param callback: the callback function to run when a message arrives
309 db400d82 Christos Stavrakakis

310 db400d82 Christos Stavrakakis
        """
311 db400d82 Christos Stavrakakis
        # Store the queues and the callback
312 db400d82 Christos Stavrakakis
        self.consumers[queue] = callback
313 db400d82 Christos Stavrakakis
314 db400d82 Christos Stavrakakis
        def handle_delivery(promise, msg):
315 db400d82 Christos Stavrakakis
            """Hide promises and messages without body"""
316 db400d82 Christos Stavrakakis
            if 'body' in msg:
317 db400d82 Christos Stavrakakis
                callback(self, msg)
318 db400d82 Christos Stavrakakis
            else:
319 a8858945 Christos Stavrakakis
                self.log.debug("Message without body %s" % msg)
320 db400d82 Christos Stavrakakis
                raise socket_error
321 db400d82 Christos Stavrakakis
322 db400d82 Christos Stavrakakis
        consume_promise = \
323 74d988b0 Christos Stavrakakis
            self.client.basic_consume(queue=queue,
324 74d988b0 Christos Stavrakakis
                                      prefetch_count=prefetch_count,
325 74d988b0 Christos Stavrakakis
                                      callback=handle_delivery)
326 db400d82 Christos Stavrakakis
327 db400d82 Christos Stavrakakis
        self.consume_promises.append(consume_promise)
328 db400d82 Christos Stavrakakis
        return consume_promise
329 db400d82 Christos Stavrakakis
330 db400d82 Christos Stavrakakis
    @reconnect_decorator
331 db400d82 Christos Stavrakakis
    def basic_wait(self, promise=None, timeout=0):
332 db400d82 Christos Stavrakakis
        """Wait for messages from the queues declared by basic_consume.
333 db400d82 Christos Stavrakakis

334 db400d82 Christos Stavrakakis
        This function will block until a message arrives from the queues that
335 db400d82 Christos Stavrakakis
        have been declared with basic_consume. If the optional arguments
336 db400d82 Christos Stavrakakis
        'promise' is given, only messages for this promise will be delivered.
337 db400d82 Christos Stavrakakis

338 db400d82 Christos Stavrakakis
        """
339 db400d82 Christos Stavrakakis
        if promise is not None:
340 b1bb9251 Christos Stavrakakis
            return self.client.wait(promise, timeout)
341 db400d82 Christos Stavrakakis
        else:
342 b1bb9251 Christos Stavrakakis
            return self.client.wait(self.consume_promises, timeout)
343 db400d82 Christos Stavrakakis
344 db400d82 Christos Stavrakakis
    @reconnect_decorator
345 db400d82 Christos Stavrakakis
    def basic_get(self, queue):
346 db400d82 Christos Stavrakakis
        """Get a single message from a queue.
347 db400d82 Christos Stavrakakis

348 db400d82 Christos Stavrakakis
        This is a non-blocking method for getting messages from a queue.
349 db400d82 Christos Stavrakakis
        It will return None if the queue is empty.
350 db400d82 Christos Stavrakakis

351 db400d82 Christos Stavrakakis
        """
352 db400d82 Christos Stavrakakis
        get_promise = self.client.basic_get(queue=queue)
353 db400d82 Christos Stavrakakis
        result = self.client.wait(get_promise)
354 db400d82 Christos Stavrakakis
        if 'empty' in result:
355 db400d82 Christos Stavrakakis
            # The queue is empty
356 db400d82 Christos Stavrakakis
            return None
357 db400d82 Christos Stavrakakis
        else:
358 db400d82 Christos Stavrakakis
            return result
359 db400d82 Christos Stavrakakis
360 db400d82 Christos Stavrakakis
    @reconnect_decorator
361 db400d82 Christos Stavrakakis
    def basic_ack(self, message):
362 db400d82 Christos Stavrakakis
        self.client.basic_ack(message)
363 db400d82 Christos Stavrakakis
364 db400d82 Christos Stavrakakis
    @reconnect_decorator
365 db400d82 Christos Stavrakakis
    def basic_nack(self, message):
366 257b694d Christos Stavrakakis
        self.client.basic_ack(message)
367 257b694d Christos Stavrakakis
368 257b694d Christos Stavrakakis
    @reconnect_decorator
369 257b694d Christos Stavrakakis
    def basic_reject(self, message, requeue=False):
370 257b694d Christos Stavrakakis
        """Reject a message.
371 257b694d Christos Stavrakakis

372 257b694d Christos Stavrakakis
        If requeue option is False and a dead letter exchange is associated
373 257b694d Christos Stavrakakis
        with the queue, the message will be routed to the dead letter exchange.
374 257b694d Christos Stavrakakis

375 257b694d Christos Stavrakakis
        """
376 257b694d Christos Stavrakakis
        self.client.basic_reject(message, requeue=requeue)
377 db400d82 Christos Stavrakakis
378 db400d82 Christos Stavrakakis
    def close(self):
379 db400d82 Christos Stavrakakis
        """Check that messages have been send and close the connection."""
380 a8858945 Christos Stavrakakis
        self.log.debug("Closing connection to %s", self.client.host)
381 db400d82 Christos Stavrakakis
        try:
382 db400d82 Christos Stavrakakis
            if self.confirms:
383 db400d82 Christos Stavrakakis
                self.get_confirms()
384 db400d82 Christos Stavrakakis
            close_promise = self.client.close()
385 db400d82 Christos Stavrakakis
            self.client.wait(close_promise)
386 db400d82 Christos Stavrakakis
        except (socket_error, spec_exceptions.ConnectionForced) as e:
387 a8858945 Christos Stavrakakis
            self.log.error('Connection closed while closing connection:%s', e)
388 db400d82 Christos Stavrakakis
389 db400d82 Christos Stavrakakis
    def queue_delete(self, queue, if_unused=True, if_empty=True):
390 db400d82 Christos Stavrakakis
        """Delete a queue.
391 db400d82 Christos Stavrakakis

392 db400d82 Christos Stavrakakis
        Returns False if the queue does not exist
393 db400d82 Christos Stavrakakis
        """
394 db400d82 Christos Stavrakakis
        try:
395 db400d82 Christos Stavrakakis
            promise = self.client.queue_delete(queue=queue,
396 db400d82 Christos Stavrakakis
                                               if_unused=if_unused,
397 db400d82 Christos Stavrakakis
                                               if_empty=if_empty)
398 db400d82 Christos Stavrakakis
            self.client.wait(promise)
399 db400d82 Christos Stavrakakis
            return True
400 db400d82 Christos Stavrakakis
        except spec_exceptions.NotFound:
401 a8858945 Christos Stavrakakis
            self.log.info("Queue %s does not exist", queue)
402 db400d82 Christos Stavrakakis
            return False
403 db400d82 Christos Stavrakakis
404 db400d82 Christos Stavrakakis
    def exchange_delete(self, exchange, if_unused=True):
405 db400d82 Christos Stavrakakis
        """Delete an exchange."""
406 db400d82 Christos Stavrakakis
        try:
407 db400d82 Christos Stavrakakis
408 db400d82 Christos Stavrakakis
            promise = self.client.exchange_delete(exchange=exchange,
409 db400d82 Christos Stavrakakis
                                                  if_unused=if_unused)
410 db400d82 Christos Stavrakakis
            self.client.wait(promise)
411 db400d82 Christos Stavrakakis
            return True
412 db400d82 Christos Stavrakakis
        except spec_exceptions.NotFound:
413 a8858945 Christos Stavrakakis
            self.log.info("Exchange %s does not exist", exchange)
414 db400d82 Christos Stavrakakis
            return False
415 db400d82 Christos Stavrakakis
416 db400d82 Christos Stavrakakis
    @reconnect_decorator
417 db400d82 Christos Stavrakakis
    def basic_cancel(self, promise=None):
418 db400d82 Christos Stavrakakis
        """Cancel consuming from a queue. """
419 db400d82 Christos Stavrakakis
        if promise is not None:
420 db400d82 Christos Stavrakakis
            self.client.basic_cancel(promise)
421 db400d82 Christos Stavrakakis
        else:
422 db400d82 Christos Stavrakakis
            for promise in self.consume_promises:
423 db400d82 Christos Stavrakakis
                self.client.basic_cancel(promise)
424 db400d82 Christos Stavrakakis
425 db400d82 Christos Stavrakakis
426 b537ac01 Christos Stavrakakis
class AMQPConnectionError(Exception):
427 db400d82 Christos Stavrakakis
    pass