Statistics
| Branch: | Tag: | Revision:

root / snf-common / synnefo / lib / amqp_puka.py @ 5f6ad491

History | View | Annotate | Download (13.4 kB)

1 db400d82 Christos Stavrakakis
# Copyright 2012 GRNET S.A. All rights reserved.
2 db400d82 Christos Stavrakakis
#
3 db400d82 Christos Stavrakakis
# Redistribution and use in source and binary forms, with or
4 db400d82 Christos Stavrakakis
# without modification, are permitted provided that the following
5 db400d82 Christos Stavrakakis
# conditions are met:
6 db400d82 Christos Stavrakakis
#
7 db400d82 Christos Stavrakakis
#   1. Redistributions of source code must retain the above
8 db400d82 Christos Stavrakakis
#      copyright notice, this list of conditions and the following
9 db400d82 Christos Stavrakakis
#      disclaimer.
10 db400d82 Christos Stavrakakis
#
11 db400d82 Christos Stavrakakis
#   2. Redistributions in binary form must reproduce the above
12 db400d82 Christos Stavrakakis
#      copyright notice, this list of conditions and the following
13 db400d82 Christos Stavrakakis
#      disclaimer in the documentation and/or other materials
14 db400d82 Christos Stavrakakis
#      provided with the distribution.
15 db400d82 Christos Stavrakakis
#
16 db400d82 Christos Stavrakakis
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 db400d82 Christos Stavrakakis
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 db400d82 Christos Stavrakakis
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 db400d82 Christos Stavrakakis
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 db400d82 Christos Stavrakakis
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 db400d82 Christos Stavrakakis
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 db400d82 Christos Stavrakakis
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 db400d82 Christos Stavrakakis
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 db400d82 Christos Stavrakakis
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 db400d82 Christos Stavrakakis
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 db400d82 Christos Stavrakakis
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 db400d82 Christos Stavrakakis
# POSSIBILITY OF SUCH DAMAGE.
28 db400d82 Christos Stavrakakis
#
29 db400d82 Christos Stavrakakis
# The views and conclusions contained in the software and
30 db400d82 Christos Stavrakakis
# documentation are those of the authors and should not be
31 db400d82 Christos Stavrakakis
# interpreted as representing official policies, either expressed
32 db400d82 Christos Stavrakakis
# or implied, of GRNET S.A.
33 db400d82 Christos Stavrakakis
34 db400d82 Christos Stavrakakis
""" Module implementing connection and communication with an AMQP broker.
35 db400d82 Christos Stavrakakis

36 db400d82 Christos Stavrakakis
AMQP Client's implemented by this module silenty detect connection failures and
37 db400d82 Christos Stavrakakis
try to reconnect to any available broker. Also publishing takes advantage of
38 db400d82 Christos Stavrakakis
publisher-confirms in order to guarantee that messages are properly delivered
39 db400d82 Christos Stavrakakis
to the broker.
40 db400d82 Christos Stavrakakis

41 db400d82 Christos Stavrakakis
"""
42 db400d82 Christos Stavrakakis
43 db400d82 Christos Stavrakakis
import logging
44 db400d82 Christos Stavrakakis
45 db400d82 Christos Stavrakakis
from puka import Client
46 db400d82 Christos Stavrakakis
from puka import spec_exceptions
47 db400d82 Christos Stavrakakis
from socket import error as socket_error
48 db400d82 Christos Stavrakakis
from time import sleep
49 db400d82 Christos Stavrakakis
from random import shuffle
50 db400d82 Christos Stavrakakis
from functools import wraps
51 db400d82 Christos Stavrakakis
from ordereddict import OrderedDict
52 db400d82 Christos Stavrakakis
from synnefo import settings
53 db400d82 Christos Stavrakakis
54 db400d82 Christos Stavrakakis
logging.basicConfig(level=logging.DEBUG,
55 db400d82 Christos Stavrakakis
                    format="[%(levelname)s %(asctime)s] %(message)s")
56 db400d82 Christos Stavrakakis
logger = logging.getLogger()
57 db400d82 Christos Stavrakakis
58 db400d82 Christos Stavrakakis
59 db400d82 Christos Stavrakakis
def reconnect_decorator(func):
60 db400d82 Christos Stavrakakis
    """
61 db400d82 Christos Stavrakakis
    Decorator for persistent connection with one or more AMQP brokers.
62 db400d82 Christos Stavrakakis

63 db400d82 Christos Stavrakakis
    """
64 db400d82 Christos Stavrakakis
    @wraps(func)
65 db400d82 Christos Stavrakakis
    def wrapper(self, *args, **kwargs):
66 db400d82 Christos Stavrakakis
        try:
67 e0b68525 Christos Stavrakakis
            return func(self, *args, **kwargs)
68 db400d82 Christos Stavrakakis
        except (socket_error, spec_exceptions.ConnectionForced) as e:
69 db400d82 Christos Stavrakakis
            logger.error('Connection Closed while in %s: %s', func.__name__, e)
70 db400d82 Christos Stavrakakis
            self.consume_promises = []
71 db400d82 Christos Stavrakakis
            self.connect()
72 db400d82 Christos Stavrakakis
73 db400d82 Christos Stavrakakis
    return wrapper
74 db400d82 Christos Stavrakakis
75 db400d82 Christos Stavrakakis
76 db400d82 Christos Stavrakakis
class AMQPPukaClient(object):
77 db400d82 Christos Stavrakakis
    """
78 db400d82 Christos Stavrakakis
    AMQP generic client implementing most of the basic AMQP operations.
79 db400d82 Christos Stavrakakis

80 db400d82 Christos Stavrakakis
    """
81 db400d82 Christos Stavrakakis
    def __init__(self, hosts=settings.AMQP_HOSTS, max_retries=30,
82 db400d82 Christos Stavrakakis
                 confirms=True, confirm_buffer=100):
83 db400d82 Christos Stavrakakis
        """Format hosts as "amqp://username:pass@host:port" """
84 db400d82 Christos Stavrakakis
85 db400d82 Christos Stavrakakis
        self.hosts = hosts
86 db400d82 Christos Stavrakakis
        shuffle(self.hosts)
87 db400d82 Christos Stavrakakis
88 db400d82 Christos Stavrakakis
        self.max_retries = max_retries
89 db400d82 Christos Stavrakakis
        self.confirms = confirms
90 db400d82 Christos Stavrakakis
        self.confirm_buffer = confirm_buffer
91 db400d82 Christos Stavrakakis
92 db400d82 Christos Stavrakakis
        self.connection = None
93 db400d82 Christos Stavrakakis
        self.channel = None
94 db400d82 Christos Stavrakakis
        self.consumers = {}
95 db400d82 Christos Stavrakakis
        self.unacked = OrderedDict()
96 db400d82 Christos Stavrakakis
        self.unsend = OrderedDict()
97 db400d82 Christos Stavrakakis
        self.consume_promises = []
98 25649e21 Christos Stavrakakis
        self.exchanges = []
99 db400d82 Christos Stavrakakis
100 db400d82 Christos Stavrakakis
    def connect(self, retries=0):
101 db400d82 Christos Stavrakakis
        if retries > self.max_retries:
102 db400d82 Christos Stavrakakis
            logger.error("Aborting after %s retries", retries - 1)
103 db400d82 Christos Stavrakakis
            raise AMQPConnectionError('Aborting after %d connection failures.'\
104 db400d82 Christos Stavrakakis
                                      % (retries - 1))
105 db400d82 Christos Stavrakakis
            return
106 db400d82 Christos Stavrakakis
107 db400d82 Christos Stavrakakis
        # Pick up a host
108 db400d82 Christos Stavrakakis
        host = self.hosts.pop()
109 db400d82 Christos Stavrakakis
        self.hosts.insert(0, host)
110 db400d82 Christos Stavrakakis
111 db400d82 Christos Stavrakakis
        self.client = Client(host, pubacks=self.confirms)
112 db400d82 Christos Stavrakakis
113 db400d82 Christos Stavrakakis
        host = host.split('@')[-1]
114 db400d82 Christos Stavrakakis
        logger.debug('Connecting to node %s' % host)
115 db400d82 Christos Stavrakakis
116 db400d82 Christos Stavrakakis
        try:
117 db400d82 Christos Stavrakakis
            promise = self.client.connect()
118 db400d82 Christos Stavrakakis
            self.client.wait(promise)
119 db400d82 Christos Stavrakakis
        except socket_error as e:
120 db400d82 Christos Stavrakakis
            logger.error('Cannot connect to host %s: %s', host, e)
121 db400d82 Christos Stavrakakis
            if retries > 2 * len(self.hosts):
122 db400d82 Christos Stavrakakis
                sleep(1)
123 db400d82 Christos Stavrakakis
            return self.connect(retries + 1)
124 db400d82 Christos Stavrakakis
125 db400d82 Christos Stavrakakis
        logger.info('Successfully connected to host: %s', host)
126 db400d82 Christos Stavrakakis
127 db400d82 Christos Stavrakakis
        logger.info('Creating channel')
128 db400d82 Christos Stavrakakis
129 db400d82 Christos Stavrakakis
        if self.unacked:
130 db400d82 Christos Stavrakakis
            self._resend_unacked_messages()
131 db400d82 Christos Stavrakakis
132 db400d82 Christos Stavrakakis
        if self.unsend:
133 db400d82 Christos Stavrakakis
            self._resend_unsend_messages()
134 db400d82 Christos Stavrakakis
135 db400d82 Christos Stavrakakis
        if self.consumers:
136 db400d82 Christos Stavrakakis
            for queue, callback in self.consumers.items():
137 db400d82 Christos Stavrakakis
                self.basic_consume(queue, callback)
138 db400d82 Christos Stavrakakis
139 25649e21 Christos Stavrakakis
        if self.exchanges:
140 25649e21 Christos Stavrakakis
            exchanges = self.exchanges
141 25649e21 Christos Stavrakakis
            self.exchanges = []
142 25649e21 Christos Stavrakakis
            for exchange, type in exchanges:
143 25649e21 Christos Stavrakakis
                self.exchange_declare(exchange, type)
144 25649e21 Christos Stavrakakis
145 db400d82 Christos Stavrakakis
    def exchange_declare(self, exchange, type='direct'):
146 db400d82 Christos Stavrakakis
        """Declare an exchange
147 db400d82 Christos Stavrakakis
        @type exchange_name: string
148 db400d82 Christos Stavrakakis
        @param exchange_name: name of the exchange
149 db400d82 Christos Stavrakakis
        @type exchange_type: string
150 db400d82 Christos Stavrakakis
        @param exhange_type: one of 'direct', 'topic', 'fanout'
151 db400d82 Christos Stavrakakis

152 db400d82 Christos Stavrakakis
        """
153 db400d82 Christos Stavrakakis
        logger.info('Declaring %s exchange: %s', type, exchange)
154 db400d82 Christos Stavrakakis
        promise = self.client.exchange_declare(exchange=exchange,
155 db400d82 Christos Stavrakakis
                                               type=type,
156 db400d82 Christos Stavrakakis
                                               durable=True,
157 db400d82 Christos Stavrakakis
                                               auto_delete=False)
158 db400d82 Christos Stavrakakis
        self.client.wait(promise)
159 25649e21 Christos Stavrakakis
        self.exchanges.append((exchange, type))
160 db400d82 Christos Stavrakakis
161 db400d82 Christos Stavrakakis
    @reconnect_decorator
162 db400d82 Christos Stavrakakis
    def queue_declare(self, queue, exclusive=False,
163 db400d82 Christos Stavrakakis
                      mirrored=True, mirrored_nodes='all'):
164 db400d82 Christos Stavrakakis
        """Declare a queue
165 db400d82 Christos Stavrakakis

166 db400d82 Christos Stavrakakis
        @type queue: string
167 db400d82 Christos Stavrakakis
        @param queue: name of the queue
168 db400d82 Christos Stavrakakis
        @param mirrored: whether the queue will be mirrored to other brokers
169 db400d82 Christos Stavrakakis
        @param mirrored_nodes: the policy for the mirrored queue.
170 db400d82 Christos Stavrakakis
            Available policies:
171 db400d82 Christos Stavrakakis
                - 'all': The queue is mirrored to all nodes and the
172 db400d82 Christos Stavrakakis
                  master node is the one to which the client is
173 db400d82 Christos Stavrakakis
                  connected
174 db400d82 Christos Stavrakakis
                - a list of nodes. The queue will be mirrored only to
175 db400d82 Christos Stavrakakis
                  the specified nodes, and the master will be the
176 db400d82 Christos Stavrakakis
                  first node in the list. Node names must be provided
177 db400d82 Christos Stavrakakis
                  and not host IP. example: [node1@rabbit,node2@rabbit]
178 db400d82 Christos Stavrakakis

179 db400d82 Christos Stavrakakis
        """
180 db400d82 Christos Stavrakakis
        logger.info('Declaring queue: %s', queue)
181 db400d82 Christos Stavrakakis
182 db400d82 Christos Stavrakakis
        if mirrored:
183 db400d82 Christos Stavrakakis
            if mirrored_nodes == 'all':
184 db400d82 Christos Stavrakakis
                arguments = {'x-ha-policy': 'all'}
185 db400d82 Christos Stavrakakis
            elif isinstance(mirrored_nodes, list):
186 db400d82 Christos Stavrakakis
                arguments = {'x-ha-policy': 'nodes',
187 db400d82 Christos Stavrakakis
                           'x-ha-policy-params': mirrored_nodes}
188 db400d82 Christos Stavrakakis
            else:
189 db400d82 Christos Stavrakakis
                raise AttributeError
190 db400d82 Christos Stavrakakis
        else:
191 db400d82 Christos Stavrakakis
            arguments = {}
192 db400d82 Christos Stavrakakis
193 db400d82 Christos Stavrakakis
        promise = self.client.queue_declare(queue=queue, durable=True,
194 db400d82 Christos Stavrakakis
                                            exclusive=exclusive,
195 db400d82 Christos Stavrakakis
                                            auto_delete=False,
196 db400d82 Christos Stavrakakis
                                            arguments=arguments)
197 db400d82 Christos Stavrakakis
        self.client.wait(promise)
198 db400d82 Christos Stavrakakis
199 db400d82 Christos Stavrakakis
    def queue_bind(self, queue, exchange, routing_key):
200 db400d82 Christos Stavrakakis
        logger.debug('Binding queue %s to exchange %s with key %s'
201 db400d82 Christos Stavrakakis
                 % (queue, exchange, routing_key))
202 db400d82 Christos Stavrakakis
        promise = self.client.queue_bind(exchange=exchange, queue=queue,
203 db400d82 Christos Stavrakakis
                                         routing_key=routing_key)
204 db400d82 Christos Stavrakakis
        self.client.wait(promise)
205 db400d82 Christos Stavrakakis
206 db400d82 Christos Stavrakakis
    @reconnect_decorator
207 db400d82 Christos Stavrakakis
    def basic_publish(self, exchange, routing_key, body):
208 db400d82 Christos Stavrakakis
        """Publish a message with a specific routing key """
209 db400d82 Christos Stavrakakis
        self._publish(exchange, routing_key, body)
210 db400d82 Christos Stavrakakis
211 db400d82 Christos Stavrakakis
        self.flush_buffer()
212 db400d82 Christos Stavrakakis
213 db400d82 Christos Stavrakakis
        if self.confirms and len(self.unacked) >= self.confirm_buffer:
214 db400d82 Christos Stavrakakis
            self.get_confirms()
215 db400d82 Christos Stavrakakis
216 db400d82 Christos Stavrakakis
    @reconnect_decorator
217 db400d82 Christos Stavrakakis
    def basic_publish_multi(self, exchange, routing_key, bodies):
218 db400d82 Christos Stavrakakis
        for body in bodies:
219 db400d82 Christos Stavrakakis
            self.unsend[body] = (exchange, routing_key)
220 db400d82 Christos Stavrakakis
221 db400d82 Christos Stavrakakis
        for body in bodies:
222 db400d82 Christos Stavrakakis
            self._publish(exchange, routing_key, body)
223 db400d82 Christos Stavrakakis
            self.unsend.pop(body)
224 db400d82 Christos Stavrakakis
225 db400d82 Christos Stavrakakis
        self.flush_buffer()
226 db400d82 Christos Stavrakakis
227 db400d82 Christos Stavrakakis
        if self.confirms:
228 db400d82 Christos Stavrakakis
            self.get_confirms()
229 db400d82 Christos Stavrakakis
230 db400d82 Christos Stavrakakis
    def _publish(self, exchange, routing_key, body):
231 db400d82 Christos Stavrakakis
        # Persisent messages by default!
232 db400d82 Christos Stavrakakis
        headers = {}
233 db400d82 Christos Stavrakakis
        headers['delivery_mode'] = 2
234 db400d82 Christos Stavrakakis
        promise = self.client.basic_publish(exchange=exchange,
235 db400d82 Christos Stavrakakis
                                            routing_key=routing_key,
236 db400d82 Christos Stavrakakis
                                            body=body, headers=headers)
237 db400d82 Christos Stavrakakis
238 db400d82 Christos Stavrakakis
        if self.confirms:
239 db400d82 Christos Stavrakakis
            self.unacked[promise] = (exchange, routing_key, body)
240 db400d82 Christos Stavrakakis
241 db400d82 Christos Stavrakakis
        return promise
242 db400d82 Christos Stavrakakis
243 db400d82 Christos Stavrakakis
    @reconnect_decorator
244 db400d82 Christos Stavrakakis
    def flush_buffer(self):
245 db400d82 Christos Stavrakakis
        while self.client.needs_write():
246 db400d82 Christos Stavrakakis
            self.client.on_write()
247 db400d82 Christos Stavrakakis
248 db400d82 Christos Stavrakakis
    @reconnect_decorator
249 db400d82 Christos Stavrakakis
    def get_confirms(self):
250 db400d82 Christos Stavrakakis
        for promise in self.unacked.keys():
251 db400d82 Christos Stavrakakis
            self.client.wait(promise)
252 db400d82 Christos Stavrakakis
            self.unacked.pop(promise)
253 db400d82 Christos Stavrakakis
254 db400d82 Christos Stavrakakis
    @reconnect_decorator
255 db400d82 Christos Stavrakakis
    def _resend_unacked_messages(self):
256 db400d82 Christos Stavrakakis
        """Resend unacked messages in case of a connection failure."""
257 db400d82 Christos Stavrakakis
        msgs = self.unacked.values()
258 db400d82 Christos Stavrakakis
        self.unacked.clear()
259 db400d82 Christos Stavrakakis
        for exchange, routing_key, body in msgs:
260 db400d82 Christos Stavrakakis
            logger.debug('Resending message %s' % body)
261 db400d82 Christos Stavrakakis
            self.basic_publish(exchange, routing_key, body)
262 db400d82 Christos Stavrakakis
263 db400d82 Christos Stavrakakis
    @reconnect_decorator
264 db400d82 Christos Stavrakakis
    def _resend_unsend_messages(self):
265 db400d82 Christos Stavrakakis
        """Resend unsend messages in case of a connection failure."""
266 db400d82 Christos Stavrakakis
        for body in self.unsend.keys():
267 db400d82 Christos Stavrakakis
            (exchange, routing_key) = self.unsend[body]
268 db400d82 Christos Stavrakakis
            self.basic_publish(exchange, routing_key, body)
269 db400d82 Christos Stavrakakis
            self.unsend.pop(body)
270 db400d82 Christos Stavrakakis
271 db400d82 Christos Stavrakakis
    @reconnect_decorator
272 db400d82 Christos Stavrakakis
    def basic_consume(self, queue, callback, prefetch_count=0):
273 db400d82 Christos Stavrakakis
        """Consume from a queue.
274 db400d82 Christos Stavrakakis

275 db400d82 Christos Stavrakakis
        @type queue: string or list of strings
276 db400d82 Christos Stavrakakis
        @param queue: the name or list of names from the queues to consume
277 db400d82 Christos Stavrakakis
        @type callback: function
278 db400d82 Christos Stavrakakis
        @param callback: the callback function to run when a message arrives
279 db400d82 Christos Stavrakakis

280 db400d82 Christos Stavrakakis
        """
281 db400d82 Christos Stavrakakis
        # Store the queues and the callback
282 db400d82 Christos Stavrakakis
        self.consumers[queue] = callback
283 db400d82 Christos Stavrakakis
284 db400d82 Christos Stavrakakis
        def handle_delivery(promise, msg):
285 db400d82 Christos Stavrakakis
            """Hide promises and messages without body"""
286 db400d82 Christos Stavrakakis
            if 'body' in msg:
287 db400d82 Christos Stavrakakis
                callback(self, msg)
288 db400d82 Christos Stavrakakis
            else:
289 db400d82 Christos Stavrakakis
                logger.debug("Message without body %s" % msg)
290 db400d82 Christos Stavrakakis
                raise socket_error
291 db400d82 Christos Stavrakakis
292 db400d82 Christos Stavrakakis
        consume_promise = \
293 db400d82 Christos Stavrakakis
                self.client.basic_consume(queue=queue,
294 db400d82 Christos Stavrakakis
                                          prefetch_count=prefetch_count,
295 db400d82 Christos Stavrakakis
                                          callback=handle_delivery)
296 db400d82 Christos Stavrakakis
297 db400d82 Christos Stavrakakis
        self.consume_promises.append(consume_promise)
298 db400d82 Christos Stavrakakis
        return consume_promise
299 db400d82 Christos Stavrakakis
300 db400d82 Christos Stavrakakis
    @reconnect_decorator
301 db400d82 Christos Stavrakakis
    def basic_wait(self, promise=None, timeout=0):
302 db400d82 Christos Stavrakakis
        """Wait for messages from the queues declared by basic_consume.
303 db400d82 Christos Stavrakakis

304 db400d82 Christos Stavrakakis
        This function will block until a message arrives from the queues that
305 db400d82 Christos Stavrakakis
        have been declared with basic_consume. If the optional arguments
306 db400d82 Christos Stavrakakis
        'promise' is given, only messages for this promise will be delivered.
307 db400d82 Christos Stavrakakis

308 db400d82 Christos Stavrakakis
        """
309 db400d82 Christos Stavrakakis
        if promise is not None:
310 db400d82 Christos Stavrakakis
            self.client.wait(promise, timeout)
311 db400d82 Christos Stavrakakis
        else:
312 db400d82 Christos Stavrakakis
            self.client.wait(self.consume_promises)
313 db400d82 Christos Stavrakakis
314 db400d82 Christos Stavrakakis
    @reconnect_decorator
315 db400d82 Christos Stavrakakis
    def basic_get(self, queue):
316 db400d82 Christos Stavrakakis
        """Get a single message from a queue.
317 db400d82 Christos Stavrakakis

318 db400d82 Christos Stavrakakis
        This is a non-blocking method for getting messages from a queue.
319 db400d82 Christos Stavrakakis
        It will return None if the queue is empty.
320 db400d82 Christos Stavrakakis

321 db400d82 Christos Stavrakakis
        """
322 db400d82 Christos Stavrakakis
        get_promise = self.client.basic_get(queue=queue)
323 db400d82 Christos Stavrakakis
        result = self.client.wait(get_promise)
324 db400d82 Christos Stavrakakis
        if 'empty' in result:
325 db400d82 Christos Stavrakakis
            # The queue is empty
326 db400d82 Christos Stavrakakis
            return None
327 db400d82 Christos Stavrakakis
        else:
328 db400d82 Christos Stavrakakis
            return result
329 db400d82 Christos Stavrakakis
330 db400d82 Christos Stavrakakis
    @reconnect_decorator
331 db400d82 Christos Stavrakakis
    def basic_ack(self, message):
332 db400d82 Christos Stavrakakis
        self.client.basic_ack(message)
333 db400d82 Christos Stavrakakis
334 db400d82 Christos Stavrakakis
    @reconnect_decorator
335 db400d82 Christos Stavrakakis
    def basic_nack(self, message):
336 db400d82 Christos Stavrakakis
        #TODO:
337 db400d82 Christos Stavrakakis
        pass
338 db400d82 Christos Stavrakakis
339 db400d82 Christos Stavrakakis
    def close(self):
340 db400d82 Christos Stavrakakis
        """Check that messages have been send and close the connection."""
341 db400d82 Christos Stavrakakis
        try:
342 db400d82 Christos Stavrakakis
            if self.confirms:
343 db400d82 Christos Stavrakakis
                self.get_confirms()
344 db400d82 Christos Stavrakakis
            close_promise = self.client.close()
345 db400d82 Christos Stavrakakis
            self.client.wait(close_promise)
346 db400d82 Christos Stavrakakis
        except (socket_error, spec_exceptions.ConnectionForced) as e:
347 db400d82 Christos Stavrakakis
            logger.error('Connection closed while closing connection:%s',
348 db400d82 Christos Stavrakakis
                          e)
349 db400d82 Christos Stavrakakis
350 db400d82 Christos Stavrakakis
    def queue_delete(self, queue, if_unused=True, if_empty=True):
351 db400d82 Christos Stavrakakis
        """Delete a queue.
352 db400d82 Christos Stavrakakis

353 db400d82 Christos Stavrakakis
        Returns False if the queue does not exist
354 db400d82 Christos Stavrakakis
        """
355 db400d82 Christos Stavrakakis
        try:
356 db400d82 Christos Stavrakakis
            promise = self.client.queue_delete(queue=queue,
357 db400d82 Christos Stavrakakis
                                               if_unused=if_unused,
358 db400d82 Christos Stavrakakis
                                               if_empty=if_empty)
359 db400d82 Christos Stavrakakis
            self.client.wait(promise)
360 db400d82 Christos Stavrakakis
            return True
361 db400d82 Christos Stavrakakis
        except spec_exceptions.NotFound:
362 db400d82 Christos Stavrakakis
            logger.info("Queue %s does not exist", queue)
363 db400d82 Christos Stavrakakis
            return False
364 db400d82 Christos Stavrakakis
365 db400d82 Christos Stavrakakis
    def exchange_delete(self, exchange, if_unused=True):
366 db400d82 Christos Stavrakakis
        """Delete an exchange."""
367 db400d82 Christos Stavrakakis
        try:
368 db400d82 Christos Stavrakakis
369 db400d82 Christos Stavrakakis
            promise = self.client.exchange_delete(exchange=exchange,
370 db400d82 Christos Stavrakakis
                                                  if_unused=if_unused)
371 db400d82 Christos Stavrakakis
            self.client.wait(promise)
372 db400d82 Christos Stavrakakis
            return True
373 db400d82 Christos Stavrakakis
        except spec_exceptions.NotFound:
374 db400d82 Christos Stavrakakis
            logger.info("Exchange %s does not exist", exchange)
375 db400d82 Christos Stavrakakis
            return False
376 db400d82 Christos Stavrakakis
377 db400d82 Christos Stavrakakis
    @reconnect_decorator
378 db400d82 Christos Stavrakakis
    def basic_cancel(self, promise=None):
379 db400d82 Christos Stavrakakis
        """Cancel consuming from a queue. """
380 db400d82 Christos Stavrakakis
        if promise is not None:
381 db400d82 Christos Stavrakakis
            self.client.basic_cancel(promise)
382 db400d82 Christos Stavrakakis
        else:
383 db400d82 Christos Stavrakakis
            for promise in self.consume_promises:
384 db400d82 Christos Stavrakakis
                self.client.basic_cancel(promise)
385 db400d82 Christos Stavrakakis
386 db400d82 Christos Stavrakakis
387 db400d82 Christos Stavrakakis
class AMQPConnectionError():
388 db400d82 Christos Stavrakakis
    pass