Revision e7f0eb53 logic/dispatcher.py

b/logic/dispatcher.py
62 62

  
63 63
from synnefo.logic import callbacks
64 64

  
65
# Queue names
66
QUEUES = []
67

  
68
# Queue bindings to exchanges
69
BINDINGS = []
70

  
65 71
class Dispatcher:
66 72

  
67 73
    logger = None
......
95 101
        self.chan.close()
96 102

  
97 103
    def _init(self):
104
        global QUEUES, BINDINGS
98 105
        self.logger.info("Initializing")
99 106

  
100
        # Queue declarations
101
        prefix = settings.BACKEND_PREFIX_ID.split('-')[0]
102

  
103
        QUEUE_GANETI_EVENTS_OP = "%s-events-op" % prefix
104
        QUEUE_GANETI_EVENTS_NET = "%s-events-net" % prefix
105
        QUEUE_CRON_CREDITS = "%s-credits" % prefix
106
        QUEUE_EMAIL = "%s-email" % prefix
107
        QUEUE_RECONC = "%s-reconciliation" % prefix
108
        if settings.DEBUG is True:
109
            QUEUE_DEBUG = "debug"       # Debug queue, retrieves all messages
110

  
111
        QUEUES = (QUEUE_GANETI_EVENTS_OP, QUEUE_GANETI_EVENTS_NET,
112
                  QUEUE_CRON_CREDITS, QUEUE_EMAIL, QUEUE_RECONC)
113

  
114
        if settings.DEBUG is True:
115
            BINDINGS_DEBUG = [
116
                # Queue       # Exchange          # RouteKey  # Handler
117
                (QUEUE_DEBUG, settings.EXCHANGE_GANETI, '#',  'dummy_proc'),
118
                (QUEUE_DEBUG, settings.EXCHANGE_CRON,   '#',  'dummy_proc'),
119
                (QUEUE_DEBUG, settings.EXCHANGE_API,    '#',  'dummy_proc'),
120
            ]
121
            QUEUES += (QUEUE_DEBUG,)
122

  
123
        # notifications of type "ganeti-op-status"
124
        DB_HANDLER_KEY_OP ='ganeti.%s.event.op' % prefix
125
        # notifications of type "ganeti-net-status"
126
        DB_HANDLER_KEY_NET ='ganeti.%s.event.net' % prefix
127

  
128
        BINDINGS = [
129
        # Queue                   # Exchange                # RouteKey          # Handler
130
        (QUEUE_GANETI_EVENTS_OP,  settings.EXCHANGE_GANETI, DB_HANDLER_KEY_OP,  'update_db'),
131
        (QUEUE_GANETI_EVENTS_NET, settings.EXCHANGE_GANETI, DB_HANDLER_KEY_NET, 'update_net'),
132
        (QUEUE_CRON_CREDITS,      settings.EXCHANGE_CRON,   '*.credits.*',      'update_credits'),
133
        (QUEUE_EMAIL,             settings.EXCHANGE_API,    '*.email.*',        'send_email'),
134
        (QUEUE_EMAIL,             settings.EXCHANGE_CRON,   '*.email.*',        'send_email'),
135
        (QUEUE_RECONC,            settings.EXCHANGE_CRON,   'reconciliation.*', 'trigger_status_update'),
136
        ]
137

  
138 107
        # Connect to RabbitMQ
139 108
        conn = None
140 109
        while conn == None:
......
162 131

  
163 132
        bindings = BINDINGS
164 133

  
165
        # Special queue for debugging, should not appear in production
166
        if self.debug and settings.DEBUG:
167
            self.chan.queue_declare(queue=QUEUE_DEBUG, durable=True,
168
                                    exclusive=False, auto_delete=False)
169
            bindings += BINDINGS_DEBUG
170

  
171 134
        # Bind queues to handler methods
172 135
        for binding in bindings:
173 136
            try:
......
184 147
            self.clienttags.append(tag)
185 148

  
186 149

  
150
def _init_queues():
151
    global QUEUES, BINDINGS
152

  
153
    # Queue declarations
154
    prefix = settings.BACKEND_PREFIX_ID.split('-')[0]
155

  
156
    QUEUE_GANETI_EVENTS_OP = "%s-events-op" % prefix
157
    QUEUE_GANETI_EVENTS_NET = "%s-events-net" % prefix
158
    QUEUE_CRON_CREDITS = "%s-credits" % prefix
159
    QUEUE_EMAIL = "%s-email" % prefix
160
    QUEUE_RECONC = "%s-reconciliation" % prefix
161
    if settings.DEBUG is True:
162
        QUEUE_DEBUG = "debug"       # Debug queue, retrieves all messages
163

  
164
    QUEUES = (QUEUE_GANETI_EVENTS_OP, QUEUE_GANETI_EVENTS_NET,
165
              QUEUE_CRON_CREDITS, QUEUE_EMAIL, QUEUE_RECONC)
166

  
167
    # notifications of type "ganeti-op-status"
168
    DB_HANDLER_KEY_OP ='ganeti.%s.event.op' % prefix
169
    # notifications of type "ganeti-net-status"
170
    DB_HANDLER_KEY_NET ='ganeti.%s.event.net' % prefix
171

  
172
    BINDINGS = [
173
    # Queue                   # Exchange                # RouteKey          # Handler
174
    (QUEUE_GANETI_EVENTS_OP,  settings.EXCHANGE_GANETI, DB_HANDLER_KEY_OP,  'update_db'),
175
    (QUEUE_GANETI_EVENTS_NET, settings.EXCHANGE_GANETI, DB_HANDLER_KEY_NET, 'update_net'),
176
    (QUEUE_CRON_CREDITS,      settings.EXCHANGE_CRON,   '*.credits.*',      'update_credits'),
177
    (QUEUE_EMAIL,             settings.EXCHANGE_API,    '*.email.*',        'send_email'),
178
    (QUEUE_EMAIL,             settings.EXCHANGE_CRON,   '*.email.*',        'send_email'),
179
    (QUEUE_RECONC,            settings.EXCHANGE_CRON,   'reconciliation.*', 'trigger_status_update'),
180
    ]
181

  
182
    if settings.DEBUG is True:
183
        BINDINGS += [
184
            # Queue       # Exchange          # RouteKey  # Handler
185
            (QUEUE_DEBUG, settings.EXCHANGE_GANETI, '#',  'dummy_proc'),
186
            (QUEUE_DEBUG, settings.EXCHANGE_CRON,   '#',  'dummy_proc'),
187
            (QUEUE_DEBUG, settings.EXCHANGE_API,    '#',  'dummy_proc'),
188
        ]
189
        QUEUES += (QUEUE_DEBUG,)
190

  
191

  
187 192
def _exit_handler(signum, frame):
188 193
    """"Catch exit signal in children processes."""
189 194
    print "%d: Caught signal %d, will raise SystemExit" % (os.getpid(), signum)
......
237 242
    """
238 243
        Delete declared queues from RabbitMQ. Use with care!
239 244
    """
245
    global QUEUES, BINDINGS
240 246
    conn = get_connection()
241 247
    chan = conn.channel()
242 248

  
243
    print "Queues to be deleted: ",  settings.QUEUES
249
    print "Queues to be deleted: ", QUEUES
244 250

  
245 251
    if not get_user_confirmation():
246 252
        return
247 253

  
248
    for queue in settings.QUEUES:
254
    for queue in QUEUES:
249 255
        try:
250 256
            chan.queue_delete(queue=queue)
251 257
            print "Deleting queue %s" % queue
......
260 266
    """
261 267
        Delete declared exchanges from RabbitMQ, after removing all queues first
262 268
    """
269
    global QUEUES, BINDINGS
263 270
    purge_queues()
264 271

  
265 272
    conn = get_connection()
......
283 290
    """
284 291
        Strip a (declared) queue from all outstanding messages
285 292
    """
293
    global QUEUES, BINDINGS
286 294
    if not queue:
287 295
        return
288 296

  
289
    if not queue in settings.QUEUES:
297
    if not queue in QUEUES:
290 298
        print "Queue %s not configured" % queue
291 299
        return
292 300

  
......
298 306
    chan = conn.channel()
299 307

  
300 308
    # Register a temporary queue binding
301
    for binding in settings.BINDINGS:
309
    for binding in BINDINGS:
302 310
        if binding[0] == queue:
303 311
            exch = binding[1]
304 312

  
......
358 366

  
359 367
    logger = log.get_logger("synnefo.dispatcher")
360 368

  
369
    # Init the global variables containing the queues
370
    _init_queues()
371

  
361 372
    # Special case for the clean up queues action
362 373
    if opts.purge_queues:
363 374
        purge_queues()

Also available in: Unified diff