Revision 2bf8d695 logic/dispatcher.py

b/logic/dispatcher.py
70 70
# Queue bindings to exchanges
71 71
BINDINGS = []
72 72

  
73

  
73 74
class Dispatcher:
74 75

  
75 76
    logger = None
......
77 78
    debug = False
78 79
    clienttags = []
79 80

  
80
    def __init__(self, debug = False):
81
    def __init__(self, debug=False):
81 82

  
82 83
        # Initialize logger
83 84
        self.logger = log.get_logger('synnefo.dispatcher')
......
119 120
                                       password=settings.RABBIT_PASSWORD,
120 121
                                       virtual_host=settings.RABBIT_VHOST)
121 122
            except socket.error:
122
                self.logger.error("Failed to connect to %s, retrying in 10s...",
123
                self.logger.error("Failed to connect to %s, retrying in 10s",
123 124
                                  settings.RABBIT_HOST)
124 125
                time.sleep(10)
125 126

  
......
173 174
              QUEUE_GANETI_BUILD_PROGR)
174 175

  
175 176
    # notifications of type "ganeti-op-status"
176
    DB_HANDLER_KEY_OP ='ganeti.%s.event.op' % prefix
177
    DB_HANDLER_KEY_OP = 'ganeti.%s.event.op' % prefix
177 178
    # notifications of type "ganeti-net-status"
178
    DB_HANDLER_KEY_NET ='ganeti.%s.event.net' % prefix
179
    DB_HANDLER_KEY_NET = 'ganeti.%s.event.net' % prefix
179 180
    # notifications of type "ganeti-create-progress"
180 181
    BUILD_MONITOR_HANDLER = 'ganeti.%s.event.progress' % prefix
181 182

  
182 183
    BINDINGS = [
183
    # Queue                   # Exchange                # RouteKey              # Handler
184
    (QUEUE_GANETI_EVENTS_OP,  settings.EXCHANGE_GANETI, DB_HANDLER_KEY_OP,      'update_db'),
185
    (QUEUE_GANETI_EVENTS_NET, settings.EXCHANGE_GANETI, DB_HANDLER_KEY_NET,     'update_net'),
186
    (QUEUE_GANETI_BUILD_PROGR,settings.EXCHANGE_GANETI, BUILD_MONITOR_HANDLER,  'update_build_progress'),
187
    (QUEUE_CRON_CREDITS,      settings.EXCHANGE_CRON,   '*.credits.*',          'update_credits'),
188
    (QUEUE_EMAIL,             settings.EXCHANGE_API,    '*.email.*',            'send_email'),
189
    (QUEUE_EMAIL,             settings.EXCHANGE_CRON,   '*.email.*',            'send_email'),
190
    (QUEUE_RECONC,            settings.EXCHANGE_CRON,   'reconciliation.*',     'trigger_status_update'),
184
    # Queue                    # Exchange                # RouteKey
185
    # Handler
186
    (QUEUE_GANETI_EVENTS_OP,   settings.EXCHANGE_GANETI, DB_HANDLER_KEY_OP,
187
     'update_db'),
188
    (QUEUE_GANETI_EVENTS_NET,  settings.EXCHANGE_GANETI, DB_HANDLER_KEY_NET,
189
     'update_net'),
190
    (QUEUE_GANETI_BUILD_PROGR, settings.EXCHANGE_GANETI, BUILD_MONITOR_HANDLER,
191
     'update_build_progress'),
192
    (QUEUE_CRON_CREDITS,       settings.EXCHANGE_CRON,   '*.credits.*',
193
     'update_credits'),
194
    (QUEUE_EMAIL,              settings.EXCHANGE_API,    '*.email.*',
195
     'send_email'),
196
    (QUEUE_EMAIL,              settings.EXCHANGE_CRON,   '*.email.*',
197
     'send_email'),
198
    (QUEUE_RECONC,             settings.EXCHANGE_CRON,   'reconciliation.*',
199
     'trigger_status_update'),
191 200
    ]
192 201

  
193 202
    if settings.DEBUG is True:
......
323 332
        print "Queue not bound to any exchange: %s" % queue
324 333
        return
325 334

  
326
    chan.queue_bind(queue=queue, exchange=exch,routing_key='#')
335
    chan.queue_bind(queue=queue, exchange=exch, routing_key='#')
327 336
    tag = chan.basic_consume(queue=queue, callback=callbacks.dummy_proc)
328 337

  
329 338
    print "Queue draining about to start, hit Ctrl+c when done"
......
362 371

  
363 372

  
364 373
def debug_mode():
365
    disp = Dispatcher(debug = True)
374
    disp = Dispatcher(debug=True)
366 375
    signal(SIGINT, _exit_handler)
367 376
    signal(SIGTERM, _exit_handler)
368 377

  

Also available in: Unified diff