Statistics
| Branch: | Tag: | Revision:

root / snf-common / synnefo / lib / amqp_puka.py @ d01cd522

History | View | Annotate | Download (13.4 kB)

1 db400d82 Christos Stavrakakis
# Copyright 2012 GRNET S.A. All rights reserved.
2 db400d82 Christos Stavrakakis
#
3 db400d82 Christos Stavrakakis
# Redistribution and use in source and binary forms, with or
4 db400d82 Christos Stavrakakis
# without modification, are permitted provided that the following
5 db400d82 Christos Stavrakakis
# conditions are met:
6 db400d82 Christos Stavrakakis
#
7 db400d82 Christos Stavrakakis
#   1. Redistributions of source code must retain the above
8 db400d82 Christos Stavrakakis
#      copyright notice, this list of conditions and the following
9 db400d82 Christos Stavrakakis
#      disclaimer.
10 db400d82 Christos Stavrakakis
#
11 db400d82 Christos Stavrakakis
#   2. Redistributions in binary form must reproduce the above
12 db400d82 Christos Stavrakakis
#      copyright notice, this list of conditions and the following
13 db400d82 Christos Stavrakakis
#      disclaimer in the documentation and/or other materials
14 db400d82 Christos Stavrakakis
#      provided with the distribution.
15 db400d82 Christos Stavrakakis
#
16 db400d82 Christos Stavrakakis
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 db400d82 Christos Stavrakakis
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 db400d82 Christos Stavrakakis
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 db400d82 Christos Stavrakakis
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 db400d82 Christos Stavrakakis
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 db400d82 Christos Stavrakakis
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 db400d82 Christos Stavrakakis
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 db400d82 Christos Stavrakakis
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 db400d82 Christos Stavrakakis
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 db400d82 Christos Stavrakakis
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 db400d82 Christos Stavrakakis
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 db400d82 Christos Stavrakakis
# POSSIBILITY OF SUCH DAMAGE.
28 db400d82 Christos Stavrakakis
#
29 db400d82 Christos Stavrakakis
# The views and conclusions contained in the software and
30 db400d82 Christos Stavrakakis
# documentation are those of the authors and should not be
31 db400d82 Christos Stavrakakis
# interpreted as representing official policies, either expressed
32 db400d82 Christos Stavrakakis
# or implied, of GRNET S.A.
33 db400d82 Christos Stavrakakis
34 db400d82 Christos Stavrakakis
""" Module implementing connection and communication with an AMQP broker.
35 db400d82 Christos Stavrakakis

36 db400d82 Christos Stavrakakis
AMQP Client's implemented by this module silenty detect connection failures and
37 db400d82 Christos Stavrakakis
try to reconnect to any available broker. Also publishing takes advantage of
38 db400d82 Christos Stavrakakis
publisher-confirms in order to guarantee that messages are properly delivered
39 db400d82 Christos Stavrakakis
to the broker.
40 db400d82 Christos Stavrakakis

41 db400d82 Christos Stavrakakis
"""
42 db400d82 Christos Stavrakakis
43 db400d82 Christos Stavrakakis
import logging
44 db400d82 Christos Stavrakakis
45 db400d82 Christos Stavrakakis
from puka import Client
46 db400d82 Christos Stavrakakis
from puka import spec_exceptions
47 db400d82 Christos Stavrakakis
from socket import error as socket_error
48 db400d82 Christos Stavrakakis
from time import sleep
49 db400d82 Christos Stavrakakis
from random import shuffle
50 db400d82 Christos Stavrakakis
from functools import wraps
51 db400d82 Christos Stavrakakis
from ordereddict import OrderedDict
52 db400d82 Christos Stavrakakis
from synnefo import settings
53 db400d82 Christos Stavrakakis
54 db400d82 Christos Stavrakakis
logger = logging.getLogger()
55 db400d82 Christos Stavrakakis
56 db400d82 Christos Stavrakakis
57 db400d82 Christos Stavrakakis
def reconnect_decorator(func):
58 db400d82 Christos Stavrakakis
    """
59 db400d82 Christos Stavrakakis
    Decorator for persistent connection with one or more AMQP brokers.
60 db400d82 Christos Stavrakakis

61 db400d82 Christos Stavrakakis
    """
62 db400d82 Christos Stavrakakis
    @wraps(func)
63 db400d82 Christos Stavrakakis
    def wrapper(self, *args, **kwargs):
64 db400d82 Christos Stavrakakis
        try:
65 e0b68525 Christos Stavrakakis
            return func(self, *args, **kwargs)
66 db400d82 Christos Stavrakakis
        except (socket_error, spec_exceptions.ConnectionForced) as e:
67 db400d82 Christos Stavrakakis
            logger.error('Connection Closed while in %s: %s', func.__name__, e)
68 db400d82 Christos Stavrakakis
            self.consume_promises = []
69 db400d82 Christos Stavrakakis
            self.connect()
70 db400d82 Christos Stavrakakis
71 db400d82 Christos Stavrakakis
    return wrapper
72 db400d82 Christos Stavrakakis
73 db400d82 Christos Stavrakakis
74 db400d82 Christos Stavrakakis
class AMQPPukaClient(object):
75 db400d82 Christos Stavrakakis
    """
76 db400d82 Christos Stavrakakis
    AMQP generic client implementing most of the basic AMQP operations.
77 db400d82 Christos Stavrakakis

78 db400d82 Christos Stavrakakis
    """
79 db400d82 Christos Stavrakakis
    def __init__(self, hosts=settings.AMQP_HOSTS, max_retries=30,
80 db400d82 Christos Stavrakakis
                 confirms=True, confirm_buffer=100):
81 2ef10562 Christos Stavrakakis
        """
82 2ef10562 Christos Stavrakakis
        Format hosts as "amqp://username:pass@host:port"
83 2ef10562 Christos Stavrakakis
        max_retries=0 defaults to unlimited retries
84 2ef10562 Christos Stavrakakis

85 2ef10562 Christos Stavrakakis
        """
86 db400d82 Christos Stavrakakis
87 db400d82 Christos Stavrakakis
        self.hosts = hosts
88 db400d82 Christos Stavrakakis
        shuffle(self.hosts)
89 db400d82 Christos Stavrakakis
90 db400d82 Christos Stavrakakis
        self.max_retries = max_retries
91 db400d82 Christos Stavrakakis
        self.confirms = confirms
92 db400d82 Christos Stavrakakis
        self.confirm_buffer = confirm_buffer
93 db400d82 Christos Stavrakakis
94 db400d82 Christos Stavrakakis
        self.connection = None
95 db400d82 Christos Stavrakakis
        self.channel = None
96 db400d82 Christos Stavrakakis
        self.consumers = {}
97 db400d82 Christos Stavrakakis
        self.unacked = OrderedDict()
98 db400d82 Christos Stavrakakis
        self.unsend = OrderedDict()
99 db400d82 Christos Stavrakakis
        self.consume_promises = []
100 25649e21 Christos Stavrakakis
        self.exchanges = []
101 db400d82 Christos Stavrakakis
102 db400d82 Christos Stavrakakis
    def connect(self, retries=0):
103 2ef10562 Christos Stavrakakis
        if self.max_retries and retries >= self.max_retries:
104 2ef10562 Christos Stavrakakis
            logger.error("Aborting after %d retries", retries)
105 db400d82 Christos Stavrakakis
            raise AMQPConnectionError('Aborting after %d connection failures.'\
106 2ef10562 Christos Stavrakakis
                                      % retries)
107 db400d82 Christos Stavrakakis
            return
108 db400d82 Christos Stavrakakis
109 db400d82 Christos Stavrakakis
        # Pick up a host
110 db400d82 Christos Stavrakakis
        host = self.hosts.pop()
111 db400d82 Christos Stavrakakis
        self.hosts.insert(0, host)
112 db400d82 Christos Stavrakakis
113 db400d82 Christos Stavrakakis
        self.client = Client(host, pubacks=self.confirms)
114 db400d82 Christos Stavrakakis
115 db400d82 Christos Stavrakakis
        host = host.split('@')[-1]
116 db400d82 Christos Stavrakakis
        logger.debug('Connecting to node %s' % host)
117 db400d82 Christos Stavrakakis
118 db400d82 Christos Stavrakakis
        try:
119 db400d82 Christos Stavrakakis
            promise = self.client.connect()
120 db400d82 Christos Stavrakakis
            self.client.wait(promise)
121 db400d82 Christos Stavrakakis
        except socket_error as e:
122 2ef10562 Christos Stavrakakis
            if retries < len(self.hosts):
123 2ef10562 Christos Stavrakakis
                logger.warning('Cannot connect to host %s: %s', host, e)
124 2ef10562 Christos Stavrakakis
            else:
125 2ef10562 Christos Stavrakakis
                logger.error('Cannot connect to host %s: %s', host, e)
126 db400d82 Christos Stavrakakis
                sleep(1)
127 db400d82 Christos Stavrakakis
            return self.connect(retries + 1)
128 db400d82 Christos Stavrakakis
129 db400d82 Christos Stavrakakis
        logger.info('Successfully connected to host: %s', host)
130 db400d82 Christos Stavrakakis
131 db400d82 Christos Stavrakakis
        logger.info('Creating channel')
132 db400d82 Christos Stavrakakis
133 db400d82 Christos Stavrakakis
        if self.unacked:
134 db400d82 Christos Stavrakakis
            self._resend_unacked_messages()
135 db400d82 Christos Stavrakakis
136 db400d82 Christos Stavrakakis
        if self.unsend:
137 db400d82 Christos Stavrakakis
            self._resend_unsend_messages()
138 db400d82 Christos Stavrakakis
139 db400d82 Christos Stavrakakis
        if self.consumers:
140 db400d82 Christos Stavrakakis
            for queue, callback in self.consumers.items():
141 db400d82 Christos Stavrakakis
                self.basic_consume(queue, callback)
142 db400d82 Christos Stavrakakis
143 25649e21 Christos Stavrakakis
        if self.exchanges:
144 25649e21 Christos Stavrakakis
            exchanges = self.exchanges
145 25649e21 Christos Stavrakakis
            self.exchanges = []
146 25649e21 Christos Stavrakakis
            for exchange, type in exchanges:
147 25649e21 Christos Stavrakakis
                self.exchange_declare(exchange, type)
148 25649e21 Christos Stavrakakis
149 db400d82 Christos Stavrakakis
    def exchange_declare(self, exchange, type='direct'):
150 db400d82 Christos Stavrakakis
        """Declare an exchange
151 db400d82 Christos Stavrakakis
        @type exchange_name: string
152 db400d82 Christos Stavrakakis
        @param exchange_name: name of the exchange
153 db400d82 Christos Stavrakakis
        @type exchange_type: string
154 db400d82 Christos Stavrakakis
        @param exhange_type: one of 'direct', 'topic', 'fanout'
155 db400d82 Christos Stavrakakis

156 db400d82 Christos Stavrakakis
        """
157 db400d82 Christos Stavrakakis
        logger.info('Declaring %s exchange: %s', type, exchange)
158 db400d82 Christos Stavrakakis
        promise = self.client.exchange_declare(exchange=exchange,
159 db400d82 Christos Stavrakakis
                                               type=type,
160 db400d82 Christos Stavrakakis
                                               durable=True,
161 db400d82 Christos Stavrakakis
                                               auto_delete=False)
162 db400d82 Christos Stavrakakis
        self.client.wait(promise)
163 25649e21 Christos Stavrakakis
        self.exchanges.append((exchange, type))
164 db400d82 Christos Stavrakakis
165 db400d82 Christos Stavrakakis
    @reconnect_decorator
166 db400d82 Christos Stavrakakis
    def queue_declare(self, queue, exclusive=False,
167 db400d82 Christos Stavrakakis
                      mirrored=True, mirrored_nodes='all'):
168 db400d82 Christos Stavrakakis
        """Declare a queue
169 db400d82 Christos Stavrakakis

170 db400d82 Christos Stavrakakis
        @type queue: string
171 db400d82 Christos Stavrakakis
        @param queue: name of the queue
172 db400d82 Christos Stavrakakis
        @param mirrored: whether the queue will be mirrored to other brokers
173 db400d82 Christos Stavrakakis
        @param mirrored_nodes: the policy for the mirrored queue.
174 db400d82 Christos Stavrakakis
            Available policies:
175 db400d82 Christos Stavrakakis
                - 'all': The queue is mirrored to all nodes and the
176 db400d82 Christos Stavrakakis
                  master node is the one to which the client is
177 db400d82 Christos Stavrakakis
                  connected
178 db400d82 Christos Stavrakakis
                - a list of nodes. The queue will be mirrored only to
179 db400d82 Christos Stavrakakis
                  the specified nodes, and the master will be the
180 db400d82 Christos Stavrakakis
                  first node in the list. Node names must be provided
181 db400d82 Christos Stavrakakis
                  and not host IP. example: [node1@rabbit,node2@rabbit]
182 db400d82 Christos Stavrakakis

183 db400d82 Christos Stavrakakis
        """
184 db400d82 Christos Stavrakakis
        logger.info('Declaring queue: %s', queue)
185 db400d82 Christos Stavrakakis
186 db400d82 Christos Stavrakakis
        if mirrored:
187 db400d82 Christos Stavrakakis
            if mirrored_nodes == 'all':
188 db400d82 Christos Stavrakakis
                arguments = {'x-ha-policy': 'all'}
189 db400d82 Christos Stavrakakis
            elif isinstance(mirrored_nodes, list):
190 db400d82 Christos Stavrakakis
                arguments = {'x-ha-policy': 'nodes',
191 db400d82 Christos Stavrakakis
                           'x-ha-policy-params': mirrored_nodes}
192 db400d82 Christos Stavrakakis
            else:
193 db400d82 Christos Stavrakakis
                raise AttributeError
194 db400d82 Christos Stavrakakis
        else:
195 db400d82 Christos Stavrakakis
            arguments = {}
196 db400d82 Christos Stavrakakis
197 db400d82 Christos Stavrakakis
        promise = self.client.queue_declare(queue=queue, durable=True,
198 db400d82 Christos Stavrakakis
                                            exclusive=exclusive,
199 db400d82 Christos Stavrakakis
                                            auto_delete=False,
200 db400d82 Christos Stavrakakis
                                            arguments=arguments)
201 db400d82 Christos Stavrakakis
        self.client.wait(promise)
202 db400d82 Christos Stavrakakis
203 db400d82 Christos Stavrakakis
    def queue_bind(self, queue, exchange, routing_key):
204 db400d82 Christos Stavrakakis
        logger.debug('Binding queue %s to exchange %s with key %s'
205 db400d82 Christos Stavrakakis
                 % (queue, exchange, routing_key))
206 db400d82 Christos Stavrakakis
        promise = self.client.queue_bind(exchange=exchange, queue=queue,
207 db400d82 Christos Stavrakakis
                                         routing_key=routing_key)
208 db400d82 Christos Stavrakakis
        self.client.wait(promise)
209 db400d82 Christos Stavrakakis
210 db400d82 Christos Stavrakakis
    @reconnect_decorator
211 db400d82 Christos Stavrakakis
    def basic_publish(self, exchange, routing_key, body):
212 db400d82 Christos Stavrakakis
        """Publish a message with a specific routing key """
213 db400d82 Christos Stavrakakis
        self._publish(exchange, routing_key, body)
214 db400d82 Christos Stavrakakis
215 db400d82 Christos Stavrakakis
        self.flush_buffer()
216 db400d82 Christos Stavrakakis
217 db400d82 Christos Stavrakakis
        if self.confirms and len(self.unacked) >= self.confirm_buffer:
218 db400d82 Christos Stavrakakis
            self.get_confirms()
219 db400d82 Christos Stavrakakis
220 db400d82 Christos Stavrakakis
    @reconnect_decorator
221 db400d82 Christos Stavrakakis
    def basic_publish_multi(self, exchange, routing_key, bodies):
222 db400d82 Christos Stavrakakis
        for body in bodies:
223 db400d82 Christos Stavrakakis
            self.unsend[body] = (exchange, routing_key)
224 db400d82 Christos Stavrakakis
225 db400d82 Christos Stavrakakis
        for body in bodies:
226 db400d82 Christos Stavrakakis
            self._publish(exchange, routing_key, body)
227 db400d82 Christos Stavrakakis
            self.unsend.pop(body)
228 db400d82 Christos Stavrakakis
229 db400d82 Christos Stavrakakis
        self.flush_buffer()
230 db400d82 Christos Stavrakakis
231 db400d82 Christos Stavrakakis
        if self.confirms:
232 db400d82 Christos Stavrakakis
            self.get_confirms()
233 db400d82 Christos Stavrakakis
234 db400d82 Christos Stavrakakis
    def _publish(self, exchange, routing_key, body):
235 db400d82 Christos Stavrakakis
        # Persisent messages by default!
236 db400d82 Christos Stavrakakis
        headers = {}
237 db400d82 Christos Stavrakakis
        headers['delivery_mode'] = 2
238 db400d82 Christos Stavrakakis
        promise = self.client.basic_publish(exchange=exchange,
239 db400d82 Christos Stavrakakis
                                            routing_key=routing_key,
240 db400d82 Christos Stavrakakis
                                            body=body, headers=headers)
241 db400d82 Christos Stavrakakis
242 db400d82 Christos Stavrakakis
        if self.confirms:
243 db400d82 Christos Stavrakakis
            self.unacked[promise] = (exchange, routing_key, body)
244 db400d82 Christos Stavrakakis
245 db400d82 Christos Stavrakakis
        return promise
246 db400d82 Christos Stavrakakis
247 db400d82 Christos Stavrakakis
    @reconnect_decorator
248 db400d82 Christos Stavrakakis
    def flush_buffer(self):
249 db400d82 Christos Stavrakakis
        while self.client.needs_write():
250 db400d82 Christos Stavrakakis
            self.client.on_write()
251 db400d82 Christos Stavrakakis
252 db400d82 Christos Stavrakakis
    @reconnect_decorator
253 db400d82 Christos Stavrakakis
    def get_confirms(self):
254 db400d82 Christos Stavrakakis
        for promise in self.unacked.keys():
255 db400d82 Christos Stavrakakis
            self.client.wait(promise)
256 db400d82 Christos Stavrakakis
            self.unacked.pop(promise)
257 db400d82 Christos Stavrakakis
258 db400d82 Christos Stavrakakis
    @reconnect_decorator
259 db400d82 Christos Stavrakakis
    def _resend_unacked_messages(self):
260 db400d82 Christos Stavrakakis
        """Resend unacked messages in case of a connection failure."""
261 db400d82 Christos Stavrakakis
        msgs = self.unacked.values()
262 db400d82 Christos Stavrakakis
        self.unacked.clear()
263 db400d82 Christos Stavrakakis
        for exchange, routing_key, body in msgs:
264 db400d82 Christos Stavrakakis
            logger.debug('Resending message %s' % body)
265 db400d82 Christos Stavrakakis
            self.basic_publish(exchange, routing_key, body)
266 db400d82 Christos Stavrakakis
267 db400d82 Christos Stavrakakis
    @reconnect_decorator
268 db400d82 Christos Stavrakakis
    def _resend_unsend_messages(self):
269 db400d82 Christos Stavrakakis
        """Resend unsend messages in case of a connection failure."""
270 db400d82 Christos Stavrakakis
        for body in self.unsend.keys():
271 db400d82 Christos Stavrakakis
            (exchange, routing_key) = self.unsend[body]
272 db400d82 Christos Stavrakakis
            self.basic_publish(exchange, routing_key, body)
273 db400d82 Christos Stavrakakis
            self.unsend.pop(body)
274 db400d82 Christos Stavrakakis
275 db400d82 Christos Stavrakakis
    @reconnect_decorator
276 db400d82 Christos Stavrakakis
    def basic_consume(self, queue, callback, prefetch_count=0):
277 db400d82 Christos Stavrakakis
        """Consume from a queue.
278 db400d82 Christos Stavrakakis

279 db400d82 Christos Stavrakakis
        @type queue: string or list of strings
280 db400d82 Christos Stavrakakis
        @param queue: the name or list of names from the queues to consume
281 db400d82 Christos Stavrakakis
        @type callback: function
282 db400d82 Christos Stavrakakis
        @param callback: the callback function to run when a message arrives
283 db400d82 Christos Stavrakakis

284 db400d82 Christos Stavrakakis
        """
285 db400d82 Christos Stavrakakis
        # Store the queues and the callback
286 db400d82 Christos Stavrakakis
        self.consumers[queue] = callback
287 db400d82 Christos Stavrakakis
288 db400d82 Christos Stavrakakis
        def handle_delivery(promise, msg):
289 db400d82 Christos Stavrakakis
            """Hide promises and messages without body"""
290 db400d82 Christos Stavrakakis
            if 'body' in msg:
291 db400d82 Christos Stavrakakis
                callback(self, msg)
292 db400d82 Christos Stavrakakis
            else:
293 db400d82 Christos Stavrakakis
                logger.debug("Message without body %s" % msg)
294 db400d82 Christos Stavrakakis
                raise socket_error
295 db400d82 Christos Stavrakakis
296 db400d82 Christos Stavrakakis
        consume_promise = \
297 db400d82 Christos Stavrakakis
                self.client.basic_consume(queue=queue,
298 db400d82 Christos Stavrakakis
                                          prefetch_count=prefetch_count,
299 db400d82 Christos Stavrakakis
                                          callback=handle_delivery)
300 db400d82 Christos Stavrakakis
301 db400d82 Christos Stavrakakis
        self.consume_promises.append(consume_promise)
302 db400d82 Christos Stavrakakis
        return consume_promise
303 db400d82 Christos Stavrakakis
304 db400d82 Christos Stavrakakis
    @reconnect_decorator
305 db400d82 Christos Stavrakakis
    def basic_wait(self, promise=None, timeout=0):
306 db400d82 Christos Stavrakakis
        """Wait for messages from the queues declared by basic_consume.
307 db400d82 Christos Stavrakakis

308 db400d82 Christos Stavrakakis
        This function will block until a message arrives from the queues that
309 db400d82 Christos Stavrakakis
        have been declared with basic_consume. If the optional arguments
310 db400d82 Christos Stavrakakis
        'promise' is given, only messages for this promise will be delivered.
311 db400d82 Christos Stavrakakis

312 db400d82 Christos Stavrakakis
        """
313 db400d82 Christos Stavrakakis
        if promise is not None:
314 db400d82 Christos Stavrakakis
            self.client.wait(promise, timeout)
315 db400d82 Christos Stavrakakis
        else:
316 db400d82 Christos Stavrakakis
            self.client.wait(self.consume_promises)
317 db400d82 Christos Stavrakakis
318 db400d82 Christos Stavrakakis
    @reconnect_decorator
319 db400d82 Christos Stavrakakis
    def basic_get(self, queue):
320 db400d82 Christos Stavrakakis
        """Get a single message from a queue.
321 db400d82 Christos Stavrakakis

322 db400d82 Christos Stavrakakis
        This is a non-blocking method for getting messages from a queue.
323 db400d82 Christos Stavrakakis
        It will return None if the queue is empty.
324 db400d82 Christos Stavrakakis

325 db400d82 Christos Stavrakakis
        """
326 db400d82 Christos Stavrakakis
        get_promise = self.client.basic_get(queue=queue)
327 db400d82 Christos Stavrakakis
        result = self.client.wait(get_promise)
328 db400d82 Christos Stavrakakis
        if 'empty' in result:
329 db400d82 Christos Stavrakakis
            # The queue is empty
330 db400d82 Christos Stavrakakis
            return None
331 db400d82 Christos Stavrakakis
        else:
332 db400d82 Christos Stavrakakis
            return result
333 db400d82 Christos Stavrakakis
334 db400d82 Christos Stavrakakis
    @reconnect_decorator
335 db400d82 Christos Stavrakakis
    def basic_ack(self, message):
336 db400d82 Christos Stavrakakis
        self.client.basic_ack(message)
337 db400d82 Christos Stavrakakis
338 db400d82 Christos Stavrakakis
    @reconnect_decorator
339 db400d82 Christos Stavrakakis
    def basic_nack(self, message):
340 db400d82 Christos Stavrakakis
        #TODO:
341 db400d82 Christos Stavrakakis
        pass
342 db400d82 Christos Stavrakakis
343 db400d82 Christos Stavrakakis
    def close(self):
344 db400d82 Christos Stavrakakis
        """Check that messages have been send and close the connection."""
345 db400d82 Christos Stavrakakis
        try:
346 db400d82 Christos Stavrakakis
            if self.confirms:
347 db400d82 Christos Stavrakakis
                self.get_confirms()
348 db400d82 Christos Stavrakakis
            close_promise = self.client.close()
349 db400d82 Christos Stavrakakis
            self.client.wait(close_promise)
350 db400d82 Christos Stavrakakis
        except (socket_error, spec_exceptions.ConnectionForced) as e:
351 db400d82 Christos Stavrakakis
            logger.error('Connection closed while closing connection:%s',
352 db400d82 Christos Stavrakakis
                          e)
353 db400d82 Christos Stavrakakis
354 db400d82 Christos Stavrakakis
    def queue_delete(self, queue, if_unused=True, if_empty=True):
355 db400d82 Christos Stavrakakis
        """Delete a queue.
356 db400d82 Christos Stavrakakis

357 db400d82 Christos Stavrakakis
        Returns False if the queue does not exist
358 db400d82 Christos Stavrakakis
        """
359 db400d82 Christos Stavrakakis
        try:
360 db400d82 Christos Stavrakakis
            promise = self.client.queue_delete(queue=queue,
361 db400d82 Christos Stavrakakis
                                               if_unused=if_unused,
362 db400d82 Christos Stavrakakis
                                               if_empty=if_empty)
363 db400d82 Christos Stavrakakis
            self.client.wait(promise)
364 db400d82 Christos Stavrakakis
            return True
365 db400d82 Christos Stavrakakis
        except spec_exceptions.NotFound:
366 db400d82 Christos Stavrakakis
            logger.info("Queue %s does not exist", queue)
367 db400d82 Christos Stavrakakis
            return False
368 db400d82 Christos Stavrakakis
369 db400d82 Christos Stavrakakis
    def exchange_delete(self, exchange, if_unused=True):
370 db400d82 Christos Stavrakakis
        """Delete an exchange."""
371 db400d82 Christos Stavrakakis
        try:
372 db400d82 Christos Stavrakakis
373 db400d82 Christos Stavrakakis
            promise = self.client.exchange_delete(exchange=exchange,
374 db400d82 Christos Stavrakakis
                                                  if_unused=if_unused)
375 db400d82 Christos Stavrakakis
            self.client.wait(promise)
376 db400d82 Christos Stavrakakis
            return True
377 db400d82 Christos Stavrakakis
        except spec_exceptions.NotFound:
378 db400d82 Christos Stavrakakis
            logger.info("Exchange %s does not exist", exchange)
379 db400d82 Christos Stavrakakis
            return False
380 db400d82 Christos Stavrakakis
381 db400d82 Christos Stavrakakis
    @reconnect_decorator
382 db400d82 Christos Stavrakakis
    def basic_cancel(self, promise=None):
383 db400d82 Christos Stavrakakis
        """Cancel consuming from a queue. """
384 db400d82 Christos Stavrakakis
        if promise is not None:
385 db400d82 Christos Stavrakakis
            self.client.basic_cancel(promise)
386 db400d82 Christos Stavrakakis
        else:
387 db400d82 Christos Stavrakakis
            for promise in self.consume_promises:
388 db400d82 Christos Stavrakakis
                self.client.basic_cancel(promise)
389 db400d82 Christos Stavrakakis
390 db400d82 Christos Stavrakakis
391 b537ac01 Christos Stavrakakis
class AMQPConnectionError(Exception):
392 db400d82 Christos Stavrakakis
    pass