Statistics
| Branch: | Tag: | Revision:

root / snf-common / synnefo / lib / amqp_haigha.py @ 1c65202f

History | View | Annotate | Download (10 kB)

1
# Copyright 2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from haigha.connections import RabbitConnection
35
from haigha.message import Message
36
from haigha import exceptions
37
from random import shuffle
38
from time import sleep
39
import logging
40
import socket
41
from synnefo import settings
42
from ordereddict import OrderedDict
43
import gevent
44
from gevent import monkey
45
from functools import wraps
46

    
47

    
48
logging.basicConfig(level=logging.INFO, format="[%(levelname)s %(asctime)s] %(message)s" )
49
logger = logging.getLogger('haigha')
50

    
51
sock_opts = {
52
  (socket.IPPROTO_TCP, socket.TCP_NODELAY): 1,
53
}
54

    
55

    
56
def reconnect_decorator(func):
57
    """
58
    Decorator for persistent connection with one or more AMQP brokers.
59

60
    """
61
    @wraps(func)
62
    def wrapper(self, *args, **kwargs):
63
        try:
64
            func(self, *args, **kwargs)
65
        except (socket.error, exceptions.ConnectionError) as e:
66
            logger.error('Connection Closed while in %s: %s', func.__name__, e)
67
            self.connect()
68

    
69
    return wrapper
70

    
71

    
72
class AMQPHaighaClient():
73
    def __init__(self, hosts=settings.AMQP_HOSTS, max_retries=30,
74
                 confirms=True, confirm_buffer=200):
75
        self.hosts = hosts
76
        shuffle(self.hosts)
77

    
78
        self.max_retries = max_retries
79
        self.confirms = confirms
80
        self.confirm_buffer = confirm_buffer
81

    
82
        self.connection = None
83
        self.channel = None
84
        self.consumers = {}
85
        self.unacked = OrderedDict()
86

    
87
    def connect(self, retries=0):
88
        if retries > self.max_retries:
89
            logger.error("Aborting after %s retries", retries - 1)
90
            raise AMQPConnectionError('Aborting after %d connection failures.'\
91
                                      % (retries - 1))
92
            return
93

    
94
        # Pick up a host
95
        host = self.hosts.pop()
96
        self.hosts.insert(0, host)
97

    
98
        #Patch gevent
99
        monkey.patch_all()
100

    
101
        try:
102
            self.connection = \
103
                 RabbitConnection(logger=logger, debug=True,
104
                      user='rabbit', password='r@bb1t',
105
                      vhost='/', host=host,
106
                      heartbeat=None,
107
                      sock_opts=sock_opts,
108
                      transport='gevent')
109
        except socket.error as e:
110
            logger.error('Cannot connect to host %s: %s', host, e)
111
            if retries > 2 * len(self.hosts):
112
                sleep(1)
113
            return self.connect(retries + 1)
114

    
115
        logger.info('Successfully connected to host: %s', host)
116

    
117
        logger.info('Creating channel')
118
        self.channel = self.connection.channel()
119

    
120
        if self.confirms:
121
            self._confirm_select()
122

    
123
        if self.unacked:
124
            self._resend_unacked_messages()
125

    
126
        if self.consumers:
127
            for queue, callback in self.consumers.items():
128
                self.basic_consume(queue, callback)
129

    
130
    def exchange_declare(self, exchange, type='direct'):
131
        """Declare an exchange
132
        @type exchange_name: string
133
        @param exchange_name: name of the exchange
134
        @type exchange_type: string
135
        @param exhange_type: one of 'direct', 'topic', 'fanout'
136

137
        """
138

    
139
        logger.info('Declaring %s exchange: %s', type, exchange)
140
        self.channel.exchange.declare(exchange, type,
141
                                      auto_delete=False, durable=True)
142

    
143
    def queue_declare(self, queue, exclusive=False, mirrored=True,
144
                      mirrored_nodes='all'):
145
        """Declare a queue
146

147
        @type queue: string
148
        @param queue: name of the queue
149
        @param mirrored: whether the queue will be mirrored to other brokers
150
        @param mirrored_nodes: the policy for the mirrored queue.
151
            Available policies:
152
                - 'all': The queue is mirrored to all nodes and the
153
                  master node is the one to which the client is
154
                  connected
155
                - a list of nodes. The queue will be mirrored only to
156
                  the specified nodes, and the master will be the
157
                  first node in the list. Node names must be provided
158
                  and not host IP. example: [node1@rabbit,node2@rabbit]
159

160
        """
161

    
162
        logger.info('Declaring queue: %s', queue)
163
        if mirrored:
164
            if mirrored_nodes == 'all':
165
                arguments = {'x-ha-policy': 'all'}
166
            elif isinstance(mirrored_nodes, list):
167
                arguments = {'x-ha-policy': 'nodes',
168
                             'x-ha-policy-params': mirrored_nodes}
169
            else:
170
                raise AttributeError
171
        else:
172
            arguments = {}
173

    
174
        self.channel.queue.declare(queue, durable=True, exclusive=exclusive,
175
                                   auto_delete=False, arguments=arguments)
176

    
177
    def queue_bind(self, queue, exchange, routing_key):
178
        logger.info('Binding queue %s to exchange %s with key %s', queue,
179
                    exchange, routing_key)
180
        self.channel.queue.bind(queue=queue, exchange=exchange,
181
                                routing_key=routing_key)
182

    
183
    def _confirm_select(self):
184
        logger.info('Setting channel to confirm mode')
185
        self.channel.confirm.select()
186
        self.channel.basic.set_ack_listener(self._ack_received)
187
        self.channel.basic.set_nack_listener(self._nack_received)
188

    
189
    @reconnect_decorator
190
    def basic_publish(self, exchange, routing_key, body):
191
        msg = Message(body, delivery_mode=2)
192
        mid = self.channel.basic.publish(msg, exchange, routing_key)
193
        if self.confirms:
194
            self.unacked[mid] = (exchange, routing_key, body)
195
            if len(self.unacked) > self.confirm_buffer:
196
                self.get_confirms()
197

    
198
        logger.debug('Published message %s with id %s', body, mid)
199

    
200
    @reconnect_decorator
201
    def get_confirms(self):
202
        self.connection.read_frames()
203

    
204
    @reconnect_decorator
205
    def _resend_unacked_messages(self):
206
        msgs = self.unacked.values()
207
        self.unacked.clear()
208
        for exchange, routing_key, body in msgs:
209
            logger.debug('Resending message %s', body)
210
            self.basic_publish(exchange, routing_key, body)
211

    
212
    @reconnect_decorator
213
    def _ack_received(self, mid):
214
        print mid
215
        logger.debug('Received ACK for message with id %s', mid)
216
        self.unacked.pop(mid)
217

    
218
    @reconnect_decorator
219
    def _nack_received(self, mid):
220
        logger.error('Received NACK for message with id %s. Retrying.', mid)
221
        (exchange, routing_key, body) = self.unacked[mid]
222
        self.basic_publish(exchange, routing_key, body)
223

    
224
    def basic_consume(self, queue, callback):
225
        """Consume from a queue.
226

227
        @type queue: string or list of strings
228
        @param queue: the name or list of names from the queues to consume
229
        @type callback: function
230
        @param callback: the callback function to run when a message arrives
231

232
        """
233

    
234
        self.consumers[queue] = callback
235
        self.channel.basic.consume(queue, consumer=callback, no_ack=False)
236

    
237
    @reconnect_decorator
238
    def basic_wait(self):
239
        """Wait for messages from the queues declared by basic_consume.
240

241
        This function will block until a message arrives from the queues that
242
        have been declared with basic_consume. If the optional arguments
243
        'promise' is given, only messages for this promise will be delivered.
244

245
        """
246

    
247
        self.connection.read_frames()
248
        gevent.sleep(0)
249

    
250
    @reconnect_decorator
251
    def basic_get(self, queue):
252
        self.channel.basic.get(queue, no_ack=False)
253

    
254
    @reconnect_decorator
255
    def basic_ack(self, message):
256
        delivery_tag = message.delivery_info['delivery_tag']
257
        self.channel.basic.ack(delivery_tag)
258

    
259
    @reconnect_decorator
260
    def basic_nack(self, message):
261
        delivery_tag = message.delivery_info['delivery_tag']
262
        self.channel.basic.ack(delivery_tag)
263

    
264
    def close(self):
265
        try:
266
            if self.confirms:
267
                while self.unacked:
268
                    print self.unacked
269
                    self.get_confirms()
270
            self.channel.close()
271
            close_info = self.channel.close_info
272
            logger.info('Successfully closed channel. Info: %s', close_info)
273
            self.connection.close()
274
        except socket.error as e:
275
            logger.error('Connection closed while closing connection:%s',
276
                          e)
277

    
278
    def queue_delete(self, queue, if_unused=True, if_empty=True):
279
        self.channel.queue.delete(queue, if_unused, if_empty)
280

    
281
    def exchange_delete(self, exchange, if_unused=True):
282
        self.channel.exchange.delete(exchange, if_unused)
283

    
284
    def basic_class(self):
285
        pass
286

    
287

    
288
class  AMQPConnectionError():
289
    pass