Revision bd444a11 logic/dispatcher.py

b/logic/dispatcher.py
96 96

  
97 97
    def _init(self):
98 98
        self.logger.info("Initializing")
99
        
99

  
100
        # Queue declarations
101
        prefix = settings.BACKEND_PREFIX_ID.split('-')[0]
102

  
103
        QUEUE_GANETI_EVENTS_OP = "%s-events-op" % prefix
104
        QUEUE_GANETI_EVENTS_NET = "%s-events-net" % prefix
105
        QUEUE_CRON_CREDITS = "%s-credits" % prefix
106
        QUEUE_EMAIL = "%s-email" % prefix
107
        QUEUE_RECONC = "%s-reconciliation" % prefix
108
        if settings.DEBUG is True:
109
            QUEUE_DEBUG = "debug"       # Debug queue, retrieves all messages
110

  
111
        QUEUES = (QUEUE_GANETI_EVENTS_OP, QUEUE_GANETI_EVENTS_NET,
112
                  QUEUE_CRON_CREDITS, QUEUE_EMAIL, QUEUE_RECONC)
113

  
114
        if settings.DEBUG is True:
115
            BINDINGS_DEBUG = [
116
                # Queue       # Exchange          # RouteKey  # Handler
117
                (QUEUE_DEBUG, settings.EXCHANGE_GANETI, '#',  'dummy_proc'),
118
                (QUEUE_DEBUG, settings.EXCHANGE_CRON,   '#',  'dummy_proc'),
119
                (QUEUE_DEBUG, settings.EXCHANGE_API,    '#',  'dummy_proc'),
120
            ]
121
            QUEUES += (QUEUE_DEBUG,)
122

  
123
        # notifications of type "ganeti-op-status"
124
        DB_HANDLER_KEY_OP ='ganeti.%s.event.op' % prefix
125
        # notifications of type "ganeti-net-status"
126
        DB_HANDLER_KEY_NET ='ganeti.%s.event.net' % prefix
127

  
128
        BINDINGS = [
129
        # Queue                   # Exchange                # RouteKey          # Handler
130
        (QUEUE_GANETI_EVENTS_OP,  settings.EXCHANGE_GANETI, DB_HANDLER_KEY_OP,  'update_db'),
131
        (QUEUE_GANETI_EVENTS_NET, settings.EXCHANGE_GANETI, DB_HANDLER_KEY_NET, 'update_net'),
132
        (QUEUE_CRON_CREDITS,      settings.EXCHANGE_CRON,   '*.credits.*',      'update_credits'),
133
        (QUEUE_EMAIL,             settings.EXCHANGE_API,    '*.email.*',        'send_email'),
134
        (QUEUE_EMAIL,             settings.EXCHANGE_CRON,   '*.email.*',        'send_email'),
135
        (QUEUE_RECONC,            settings.EXCHANGE_CRON,   'reconciliation.*', 'trigger_status_update'),
136
        ]
137

  
100 138
        # Connect to RabbitMQ
101 139
        conn = None
102 140
        while conn == None:
......
118 156
            self.chan.exchange_declare(exchange=exchange, type="topic",
119 157
                                       durable=True, auto_delete=False)
120 158

  
121
        for queue in settings.QUEUES:
159
        for queue in QUEUES:
122 160
            self.chan.queue_declare(queue=queue, durable=True,
123 161
                                    exclusive=False, auto_delete=False)
124 162

  
125
        bindings = settings.BINDINGS
163
        bindings = BINDINGS
126 164

  
127 165
        # Special queue for debugging, should not appear in production
128 166
        if self.debug and settings.DEBUG:
129
            self.chan.queue_declare(queue=settings.QUEUE_DEBUG, durable=True,
167
            self.chan.queue_declare(queue=QUEUE_DEBUG, durable=True,
130 168
                                    exclusive=False, auto_delete=False)
131
            bindings += settings.BINDINGS_DEBUG
169
            bindings += BINDINGS_DEBUG
132 170

  
133 171
        # Bind queues to handler methods
134 172
        for binding in bindings:

Also available in: Unified diff