Revision 659de616

b/snf-cyclades-app/synnefo/app_settings/default/queues.py
9 9
AMQP_BACKEND = 'puka'
10 10

  
11 11
EXCHANGE_GANETI = "ganeti"  # Messages from Ganeti
12
EXCHANGE_CRON = "cron"      # Messages from periodically triggered tasks
13
EXCHANGE_API = "api"        # Messages from the REST API
14
EXCHANGES = (EXCHANGE_GANETI, EXCHANGE_CRON, EXCHANGE_API)
b/snf-cyclades-app/synnefo/logic/dispatcher.py
62 62

  
63 63
from synnefo.lib.amqp import AMQPClient
64 64
from synnefo.logic import callbacks
65
from synnefo.logic import queues
65 66

  
66 67
import logging
67 68
log = logging.getLogger()
68 69

  
69
# Queue names
70
QUEUES = []
71

  
72
# Queue bindings to exchanges
73
BINDINGS = []
74

  
75 70

  
76 71
class Dispatcher:
77 72
    debug = False
......
106 101
        self.client.close()
107 102

  
108 103
    def _init(self):
109
        global QUEUES, BINDINGS
110 104
        log.info("Initializing")
111 105

  
112 106
        self.client = AMQPClient()
......
114 108
        self.client.connect()
115 109

  
116 110
        # Declare queues and exchanges
117
        for exchange in settings.EXCHANGES:
118
            self.client.exchange_declare(exchange=exchange,
119
                                         type="topic")
120

  
121
        for queue in QUEUES:
111
        exchange = settings.EXCHANGE_GANETI
112
        exchange_dl = queues.convert_exchange_to_dead(exchange)
113
        self.client.exchange_declare(exchange=exchange,
114
                                     type="topic")
115
        self.client.exchange_declare(exchange=exchange_dl,
116
                                     type="topic")
117

  
118
        for queue in queues.QUEUES:
122 119
            # Queues are mirrored to all RabbitMQ brokers
123
            self.client.queue_declare(queue=queue, mirrored=True)
124

  
125
        bindings = BINDINGS
120
            self.client.queue_declare(queue=queue, mirrored=True,
121
                                      dead_letter_exchange=exchange_dl)
122
            # Declare the corresponding dead-letter queue
123
            queue_dl = queues.convert_queue_to_dead(queue)
124
            self.client.queue_declare(queue=queue_dl, mirrored=True)
126 125

  
127 126
        # Bind queues to handler methods
128
        for binding in bindings:
127
        for binding in queues.BINDINGS:
129 128
            try:
130 129
                callback = getattr(callbacks, binding[3])
131 130
            except AttributeError:
132 131
                log.error("Cannot find callback %s", binding[3])
133 132
                raise SystemExit(1)
133
            queue = binding[0]
134
            exchange = binding[1]
135
            routing_key = binding[2]
134 136

  
135
            self.client.queue_bind(queue=binding[0], exchange=binding[1],
136
                                   routing_key=binding[2])
137
            self.client.queue_bind(queue=queue, exchange=exchange,
138
                                   routing_key=routing_key)
137 139

  
138 140
            self.client.basic_consume(queue=binding[0],
139 141
                                      callback=callback,
140 142
                                      prefetch_count=5)
141 143

  
144
            queue_dl = queues.convert_queue_to_dead(queue)
145
            exchange_dl = queues.convert_exchange_to_dead(exchange)
146
            # Bind the corresponding dead-letter queue
147
            self.client.queue_bind(queue=queue_dl,
148
                                   exchange=exchange_dl,
149
                                   routing_key=routing_key)
150

  
142 151
            log.debug("Binding %s(%s) to queue %s with handler %s",
143
                      binding[1], binding[2], binding[0], binding[3])
144

  
145

  
146
def _init_queues():
147
    global QUEUES, BINDINGS
148

  
149
    # Queue declarations
150
    prefix = settings.BACKEND_PREFIX_ID.split('-')[0]
151

  
152
    QUEUE_GANETI_EVENTS_OP = "%s-events-op" % prefix
153
    QUEUE_GANETI_EVENTS_NETWORK = "%s-events-network" % prefix
154
    QUEUE_GANETI_EVENTS_NET = "%s-events-net" % prefix
155
    QUEUE_GANETI_BUILD_PROGR = "%s-events-progress" % prefix
156
    QUEUE_RECONC = "%s-reconciliation" % prefix
157
    if settings.DEBUG is True:
158
        QUEUE_DEBUG = "%s-debug" % prefix  # Debug queue, retrieves all messages
159

  
160
    QUEUES = (QUEUE_GANETI_EVENTS_OP, QUEUE_GANETI_EVENTS_NETWORK, QUEUE_GANETI_EVENTS_NET, QUEUE_RECONC,
161
              QUEUE_GANETI_BUILD_PROGR)
162

  
163
    # notifications of type "ganeti-op-status"
164
    DB_HANDLER_KEY_OP = 'ganeti.%s.event.op' % prefix
165
    # notifications of type "ganeti-network-status"
166
    DB_HANDLER_KEY_NETWORK = 'ganeti.%s.event.network' % prefix
167
    # notifications of type "ganeti-net-status"
168
    DB_HANDLER_KEY_NET = 'ganeti.%s.event.net' % prefix
169
    # notifications of type "ganeti-create-progress"
170
    BUILD_MONITOR_HANDLER = 'ganeti.%s.event.progress' % prefix
171
    # reconciliation
172
    RECONC_HANDLER = 'reconciliation.%s.*' % prefix
173

  
174
    BINDINGS = [
175
    # Queue                   # Exchange                # RouteKey              # Handler
176
    (QUEUE_GANETI_EVENTS_OP,  settings.EXCHANGE_GANETI, DB_HANDLER_KEY_OP,      'update_db'),
177
    (QUEUE_GANETI_EVENTS_NETWORK, settings.EXCHANGE_GANETI, DB_HANDLER_KEY_NETWORK, 'update_network'),
178
    (QUEUE_GANETI_EVENTS_NET, settings.EXCHANGE_GANETI, DB_HANDLER_KEY_NET,     'update_net'),
179
    (QUEUE_GANETI_BUILD_PROGR, settings.EXCHANGE_GANETI, BUILD_MONITOR_HANDLER,  'update_build_progress'),
180
    ]
181

  
182
    if settings.DEBUG is True:
183
        BINDINGS += [
184
            # Queue       # Exchange          # RouteKey  # Handler
185
            (QUEUE_DEBUG, settings.EXCHANGE_GANETI, '#',  'dummy_proc'),
186
            (QUEUE_DEBUG, settings.EXCHANGE_CRON,   '#',  'dummy_proc'),
187
            (QUEUE_DEBUG, settings.EXCHANGE_API,    '#',  'dummy_proc'),
188
        ]
189
        QUEUES += (QUEUE_DEBUG,)
152
                      exchange, routing_key, queue, binding[3])
190 153

  
191 154

  
192 155
def parse_arguments(args):
......
219 182
    """
220 183
        Delete declared queues from RabbitMQ. Use with care!
221 184
    """
222
    global QUEUES, BINDINGS
223 185
    client = AMQPClient(max_retries=120)
224 186
    client.connect()
225 187

  
226
    print "Queues to be deleted: ", QUEUES
188
    print "Queues to be deleted: ", queues.QUEUES
227 189

  
228 190
    if not get_user_confirmation():
229 191
        return
230 192

  
231
    for queue in QUEUES:
193
    for queue in queues.QUEUES:
232 194
        result = client.queue_delete(queue=queue)
233 195
        print "Deleting queue %s. Result: %s" % (queue, result)
234 196

  
......
237 199

  
238 200
def purge_exchanges():
239 201
    """Delete declared exchanges from RabbitMQ, after removing all queues"""
240
    global QUEUES, BINDINGS
241 202
    purge_queues()
242 203

  
243 204
    client = AMQPClient()
244 205
    client.connect()
245 206

  
246
    print "Exchanges to be deleted: ", settings.EXCHANGES
207
    exchanges = queues.EXCHANGES
208
    print "Exchanges to be deleted: ", exchanges
247 209

  
248 210
    if not get_user_confirmation():
249 211
        return
250 212

  
251
    for exchange in settings.EXCHANGES:
252
        result = client.exchange_delete(exchange=exchange)
253
        print "Deleting exchange %s. Result: %s" % (exchange, result)
254

  
213
    for exch in exchanges:
214
        result = client.exchange_delete(exchange=exch)
215
        print "Deleting exchange %s. Result: %s" % (exch, result)
255 216
    client.close()
256 217

  
257 218

  
258 219
def drain_queue(queue):
259 220
    """Strip a (declared) queue from all outstanding messages"""
260
    global QUEUES, BINDINGS
261 221
    if not queue:
262 222
        return
263 223

  
264
    if not queue in QUEUES:
224
    if not queue in queues.QUEUES:
265 225
        print "Queue %s not configured" % queue
266 226
        return
267 227

  
......
336 296
    setproctitle.setproctitle(sys.argv[0])
337 297
    setup_logging(opts)
338 298

  
339
    # Init the global variables containing the queues
340
    _init_queues()
341

  
342 299
    # Special case for the clean up queues action
343 300
    if opts.purge_queues:
344 301
        purge_queues()
b/snf-cyclades-app/synnefo/logic/queues.py
1
# Copyright 2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or without
4
# modification, are permitted provided that the following conditions
5
# are met:
6
#
7
#   1. Redistributions of source code must retain the above copyright
8
#      notice, this list of conditions and the following disclaimer.
9
#
10
#  2. Redistributions in binary form must reproduce the above copyright
11
#     notice, this list of conditions and the following disclaimer in the
12
#     documentation and/or other materials provided with the distribution.
13
#
14
# THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
15
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17
# ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
18
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24
# SUCH DAMAGE.
25
#
26
# The views and conclusions contained in the software and documentation are
27
# those of the authors and should not be interpreted as representing official
28
# policies, either expressed or implied, of GRNET S.A.
29

  
30

  
31
from synnefo.settings import BACKEND_PREFIX_ID, DEBUG, EXCHANGE_GANETI
32

  
33
try:
34
    prefix = BACKEND_PREFIX_ID.split('-')[0]
35
except TypeError, IndexError:
36
    raise Exception("Invalid BACKEND_PREFIX_ID")
37

  
38
# EXCHANGES
39
EXCHANGES = (EXCHANGE_GANETI,)
40

  
41

  
42
# QUEUES
43
QUEUE_OP = "%s-events-op" % prefix
44
QUEUE_NETWORK = "%s-events-network" % prefix
45
QUEUE_NET = "%s-events-net" % prefix
46
QUEUE_PROGRESS = "%s-events-progress" % prefix
47

  
48

  
49
QUEUES = (QUEUE_OP,
50
          QUEUE_NETWORK,
51
          QUEUE_NET,
52
          QUEUE_PROGRESS)
53

  
54
# ROUTING KEYS
55
# notifications of type "ganeti-op-status"
56
KEY_OP = 'ganeti.%s.event.op' % prefix
57
# notifications of type "ganeti-network-status"
58
KEY_NETWORK = 'ganeti.%s.event.network' % prefix
59
# notifications of type "ganeti-net-status"
60
KEY_NET = 'ganeti.%s.event.net' % prefix
61
# notifications of type "ganeti-create-progress"
62
KEY_PROGRESS = 'ganeti.%s.event.progress' % prefix
63

  
64
# BINDINGS:
65
BINDINGS = (
66
# Queue             # Exchange          # RouteKey    # Handler
67
(QUEUE_OP,          EXCHANGE_GANETI,    KEY_OP,       'update_db'),
68
(QUEUE_NETWORK,     EXCHANGE_GANETI,    KEY_NETWORK,  'update_network'),
69
(QUEUE_NET,         EXCHANGE_GANETI,    KEY_NET,      'update_net'),
70
(QUEUE_PROGRESS,    EXCHANGE_GANETI,    KEY_PROGRESS, 'update_build_progress'),
71
)
72

  
73

  
74
## Extra for DEBUG:
75
if DEBUG is True:
76
    # Debug queue, retrieves all messages
77
    QUEUE_DEBUG = "%s-debug" % prefix
78
    QUEUES += (QUEUE_DEBUG,)
79
    BINDINGS += ((QUEUE_DEBUG, EXCHANGE_GANETI, "#", "dummy_proc"),)
80

  
81

  
82
def convert_queue_to_dead(queue):
83
    """Convert the name of a queue to the corresponding dead-letter one"""
84
    return queue + "-dl"
85

  
86

  
87
def convert_exchange_to_dead(exchange):
88
    """Convert the name of an exchange to the corresponding dead-letter one"""
89
    return exchange + "-dl"

Also available in: Unified diff