Revision 659de616 snf-cyclades-app/synnefo/logic/dispatcher.py

b/snf-cyclades-app/synnefo/logic/dispatcher.py
62 62

  
63 63
from synnefo.lib.amqp import AMQPClient
64 64
from synnefo.logic import callbacks
65
from synnefo.logic import queues
65 66

  
66 67
import logging
67 68
log = logging.getLogger()
68 69

  
69
# Queue names
70
QUEUES = []
71

  
72
# Queue bindings to exchanges
73
BINDINGS = []
74

  
75 70

  
76 71
class Dispatcher:
77 72
    debug = False
......
106 101
        self.client.close()
107 102

  
108 103
    def _init(self):
109
        global QUEUES, BINDINGS
110 104
        log.info("Initializing")
111 105

  
112 106
        self.client = AMQPClient()
......
114 108
        self.client.connect()
115 109

  
116 110
        # Declare queues and exchanges
117
        for exchange in settings.EXCHANGES:
118
            self.client.exchange_declare(exchange=exchange,
119
                                         type="topic")
120

  
121
        for queue in QUEUES:
111
        exchange = settings.EXCHANGE_GANETI
112
        exchange_dl = queues.convert_exchange_to_dead(exchange)
113
        self.client.exchange_declare(exchange=exchange,
114
                                     type="topic")
115
        self.client.exchange_declare(exchange=exchange_dl,
116
                                     type="topic")
117

  
118
        for queue in queues.QUEUES:
122 119
            # Queues are mirrored to all RabbitMQ brokers
123
            self.client.queue_declare(queue=queue, mirrored=True)
124

  
125
        bindings = BINDINGS
120
            self.client.queue_declare(queue=queue, mirrored=True,
121
                                      dead_letter_exchange=exchange_dl)
122
            # Declare the corresponding dead-letter queue
123
            queue_dl = queues.convert_queue_to_dead(queue)
124
            self.client.queue_declare(queue=queue_dl, mirrored=True)
126 125

  
127 126
        # Bind queues to handler methods
128
        for binding in bindings:
127
        for binding in queues.BINDINGS:
129 128
            try:
130 129
                callback = getattr(callbacks, binding[3])
131 130
            except AttributeError:
132 131
                log.error("Cannot find callback %s", binding[3])
133 132
                raise SystemExit(1)
133
            queue = binding[0]
134
            exchange = binding[1]
135
            routing_key = binding[2]
134 136

  
135
            self.client.queue_bind(queue=binding[0], exchange=binding[1],
136
                                   routing_key=binding[2])
137
            self.client.queue_bind(queue=queue, exchange=exchange,
138
                                   routing_key=routing_key)
137 139

  
138 140
            self.client.basic_consume(queue=binding[0],
139 141
                                      callback=callback,
140 142
                                      prefetch_count=5)
141 143

  
144
            queue_dl = queues.convert_queue_to_dead(queue)
145
            exchange_dl = queues.convert_exchange_to_dead(exchange)
146
            # Bind the corresponding dead-letter queue
147
            self.client.queue_bind(queue=queue_dl,
148
                                   exchange=exchange_dl,
149
                                   routing_key=routing_key)
150

  
142 151
            log.debug("Binding %s(%s) to queue %s with handler %s",
143
                      binding[1], binding[2], binding[0], binding[3])
144

  
145

  
146
def _init_queues():
147
    global QUEUES, BINDINGS
148

  
149
    # Queue declarations
150
    prefix = settings.BACKEND_PREFIX_ID.split('-')[0]
151

  
152
    QUEUE_GANETI_EVENTS_OP = "%s-events-op" % prefix
153
    QUEUE_GANETI_EVENTS_NETWORK = "%s-events-network" % prefix
154
    QUEUE_GANETI_EVENTS_NET = "%s-events-net" % prefix
155
    QUEUE_GANETI_BUILD_PROGR = "%s-events-progress" % prefix
156
    QUEUE_RECONC = "%s-reconciliation" % prefix
157
    if settings.DEBUG is True:
158
        QUEUE_DEBUG = "%s-debug" % prefix  # Debug queue, retrieves all messages
159

  
160
    QUEUES = (QUEUE_GANETI_EVENTS_OP, QUEUE_GANETI_EVENTS_NETWORK, QUEUE_GANETI_EVENTS_NET, QUEUE_RECONC,
161
              QUEUE_GANETI_BUILD_PROGR)
162

  
163
    # notifications of type "ganeti-op-status"
164
    DB_HANDLER_KEY_OP = 'ganeti.%s.event.op' % prefix
165
    # notifications of type "ganeti-network-status"
166
    DB_HANDLER_KEY_NETWORK = 'ganeti.%s.event.network' % prefix
167
    # notifications of type "ganeti-net-status"
168
    DB_HANDLER_KEY_NET = 'ganeti.%s.event.net' % prefix
169
    # notifications of type "ganeti-create-progress"
170
    BUILD_MONITOR_HANDLER = 'ganeti.%s.event.progress' % prefix
171
    # reconciliation
172
    RECONC_HANDLER = 'reconciliation.%s.*' % prefix
173

  
174
    BINDINGS = [
175
    # Queue                   # Exchange                # RouteKey              # Handler
176
    (QUEUE_GANETI_EVENTS_OP,  settings.EXCHANGE_GANETI, DB_HANDLER_KEY_OP,      'update_db'),
177
    (QUEUE_GANETI_EVENTS_NETWORK, settings.EXCHANGE_GANETI, DB_HANDLER_KEY_NETWORK, 'update_network'),
178
    (QUEUE_GANETI_EVENTS_NET, settings.EXCHANGE_GANETI, DB_HANDLER_KEY_NET,     'update_net'),
179
    (QUEUE_GANETI_BUILD_PROGR, settings.EXCHANGE_GANETI, BUILD_MONITOR_HANDLER,  'update_build_progress'),
180
    ]
181

  
182
    if settings.DEBUG is True:
183
        BINDINGS += [
184
            # Queue       # Exchange          # RouteKey  # Handler
185
            (QUEUE_DEBUG, settings.EXCHANGE_GANETI, '#',  'dummy_proc'),
186
            (QUEUE_DEBUG, settings.EXCHANGE_CRON,   '#',  'dummy_proc'),
187
            (QUEUE_DEBUG, settings.EXCHANGE_API,    '#',  'dummy_proc'),
188
        ]
189
        QUEUES += (QUEUE_DEBUG,)
152
                      exchange, routing_key, queue, binding[3])
190 153

  
191 154

  
192 155
def parse_arguments(args):
......
219 182
    """
220 183
        Delete declared queues from RabbitMQ. Use with care!
221 184
    """
222
    global QUEUES, BINDINGS
223 185
    client = AMQPClient(max_retries=120)
224 186
    client.connect()
225 187

  
226
    print "Queues to be deleted: ", QUEUES
188
    print "Queues to be deleted: ", queues.QUEUES
227 189

  
228 190
    if not get_user_confirmation():
229 191
        return
230 192

  
231
    for queue in QUEUES:
193
    for queue in queues.QUEUES:
232 194
        result = client.queue_delete(queue=queue)
233 195
        print "Deleting queue %s. Result: %s" % (queue, result)
234 196

  
......
237 199

  
238 200
def purge_exchanges():
239 201
    """Delete declared exchanges from RabbitMQ, after removing all queues"""
240
    global QUEUES, BINDINGS
241 202
    purge_queues()
242 203

  
243 204
    client = AMQPClient()
244 205
    client.connect()
245 206

  
246
    print "Exchanges to be deleted: ", settings.EXCHANGES
207
    exchanges = queues.EXCHANGES
208
    print "Exchanges to be deleted: ", exchanges
247 209

  
248 210
    if not get_user_confirmation():
249 211
        return
250 212

  
251
    for exchange in settings.EXCHANGES:
252
        result = client.exchange_delete(exchange=exchange)
253
        print "Deleting exchange %s. Result: %s" % (exchange, result)
254

  
213
    for exch in exchanges:
214
        result = client.exchange_delete(exchange=exch)
215
        print "Deleting exchange %s. Result: %s" % (exch, result)
255 216
    client.close()
256 217

  
257 218

  
258 219
def drain_queue(queue):
259 220
    """Strip a (declared) queue from all outstanding messages"""
260
    global QUEUES, BINDINGS
261 221
    if not queue:
262 222
        return
263 223

  
264
    if not queue in QUEUES:
224
    if not queue in queues.QUEUES:
265 225
        print "Queue %s not configured" % queue
266 226
        return
267 227

  
......
336 296
    setproctitle.setproctitle(sys.argv[0])
337 297
    setup_logging(opts)
338 298

  
339
    # Init the global variables containing the queues
340
    _init_queues()
341

  
342 299
    # Special case for the clean up queues action
343 300
    if opts.purge_queues:
344 301
        purge_queues()

Also available in: Unified diff