Statistics
| Branch: | Tag: | Revision:

root / logic / dispatcher.py @ a3ed7c08

History | View | Annotate | Download (15 kB)

1
#!/usr/bin/env python
2
# Copyright 2011 GRNET S.A. All rights reserved.
3
#
4
# Redistribution and use in source and binary forms, with or without
5
# modification, are permitted provided that the following conditions
6
# are met:
7
#
8
#   1. Redistributions of source code must retain the above copyright
9
#      notice, this list of conditions and the following disclaimer.
10
#
11
#  2. Redistributions in binary form must reproduce the above copyright
12
#     notice, this list of conditions and the following disclaimer in the
13
#     documentation and/or other materials provided with the distribution.
14
#
15
# THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
16
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18
# ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
19
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
20
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
21
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
22
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
23
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
24
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
25
# SUCH DAMAGE.
26
#
27
# The views and conclusions contained in the software and documentation are
28
# those of the authors and should not be interpreted as representing official
29
# policies, either expressed or implied, of GRNET S.A.
30

    
31

    
32
""" Message queue setup, dispatch and admin
33

34
This program sets up connections to the queues configured in settings.py
35
and implements the message wait and dispatch loops. Actual messages are
36
handled in the dispatched functions.
37

38
"""
39
from django.core.management import setup_environ
40

    
41
import sys
42
import os
43
path = os.path.normpath(os.path.join(os.getcwd(), '..'))
44
sys.path.append(path)
45
import synnefo.settings as settings
46
from synnefo.logic import log
47

    
48
setup_environ(settings)
49

    
50
from amqplib import client_0_8 as amqp
51
from signal import signal, SIGINT, SIGTERM
52

    
53
import time
54
import socket
55
from daemon import daemon
56

    
57
import traceback
58

    
59
# Take care of differences between python-daemon versions.
60
try:
61
    from daemon import pidfile
62
except:
63
    from daemon import pidlockfile
64

    
65
from synnefo.logic import callbacks
66

    
67
# Queue names
68
QUEUES = []
69

    
70
# Queue bindings to exchanges
71
BINDINGS = []
72

    
73

    
74
class Dispatcher:
75

    
76
    logger = None
77
    chan = None
78
    debug = False
79
    clienttags = []
80

    
81
    def __init__(self, debug=False):
82

    
83
        # Initialize logger
84
        self.logger = log.get_logger('synnefo.dispatcher')
85

    
86
        self.debug = debug
87
        self._init()
88

    
89
    def wait(self):
90
        while True:
91
            try:
92
                self.chan.wait()
93
            except SystemExit:
94
                break
95
            except amqp.exceptions.AMQPConnectionException:
96
                self.logger.error("Server went away, reconnecting...")
97
                self._init()
98
            except socket.error:
99
                self.logger.error("Server went away, reconnecting...")
100
                self._init()
101
            except Exception, e:
102
                self.logger.exception("Caught unexpected exception")
103

    
104
        [self.chan.basic_cancel(clienttag) for clienttag in self.clienttags]
105
        self.chan.connection.close()
106
        self.chan.close()
107

    
108
    def _init(self):
109
        global QUEUES, BINDINGS
110
        self.logger.info("Initializing")
111

    
112
        # Connect to RabbitMQ
113
        conn = None
114
        while conn == None:
115
            self.logger.info("Attempting to connect to %s",
116
                             settings.RABBIT_HOST)
117
            try:
118
                conn = amqp.Connection(host=settings.RABBIT_HOST,
119
                                       userid=settings.RABBIT_USERNAME,
120
                                       password=settings.RABBIT_PASSWORD,
121
                                       virtual_host=settings.RABBIT_VHOST)
122
            except socket.error:
123
                self.logger.error("Failed to connect to %s, retrying in 10s",
124
                                  settings.RABBIT_HOST)
125
                time.sleep(10)
126

    
127
        self.logger.info("Connection succesful, opening channel")
128
        self.chan = conn.channel()
129

    
130
        # Declare queues and exchanges
131
        for exchange in settings.EXCHANGES:
132
            self.chan.exchange_declare(exchange=exchange, type="topic",
133
                                       durable=True, auto_delete=False)
134

    
135
        for queue in QUEUES:
136
            self.chan.queue_declare(queue=queue, durable=True,
137
                                    exclusive=False, auto_delete=False)
138

    
139
        bindings = BINDINGS
140

    
141
        # Bind queues to handler methods
142
        for binding in bindings:
143
            try:
144
                callback = getattr(callbacks, binding[3])
145
            except AttributeError:
146
                self.logger.error("Cannot find callback %s" % binding[3])
147
                raise SystemExit(1)
148

    
149
            self.chan.queue_bind(queue=binding[0], exchange=binding[1],
150
                                 routing_key=binding[2])
151
            tag = self.chan.basic_consume(queue=binding[0], callback=callback)
152
            self.logger.debug("Binding %s(%s) to queue %s with handler %s" %
153
                              (binding[1], binding[2], binding[0], binding[3]))
154
            self.clienttags.append(tag)
155

    
156

    
157
def _init_queues():
158
    global QUEUES, BINDINGS
159

    
160
    # Queue declarations
161
    prefix = settings.BACKEND_PREFIX_ID.split('-')[0]
162

    
163
    QUEUE_GANETI_EVENTS_OP = "%s-events-op" % prefix
164
    QUEUE_GANETI_EVENTS_NET = "%s-events-net" % prefix
165
    QUEUE_GANETI_BUILD_PROGR = "%s-events-progress" % prefix
166
    QUEUE_CRON_CREDITS = "%s-credits" % prefix
167
    QUEUE_EMAIL = "%s-email" % prefix
168
    QUEUE_RECONC = "%s-reconciliation" % prefix
169
    if settings.DEBUG is True:
170
        QUEUE_DEBUG = "debug"       # Debug queue, retrieves all messages
171

    
172
    QUEUES = (QUEUE_GANETI_EVENTS_OP, QUEUE_GANETI_EVENTS_NET,
173
              QUEUE_CRON_CREDITS, QUEUE_EMAIL, QUEUE_RECONC,
174
              QUEUE_GANETI_BUILD_PROGR)
175

    
176
    # notifications of type "ganeti-op-status"
177
    DB_HANDLER_KEY_OP = 'ganeti.%s.event.op' % prefix
178
    # notifications of type "ganeti-net-status"
179
    DB_HANDLER_KEY_NET = 'ganeti.%s.event.net' % prefix
180
    # notifications of type "ganeti-create-progress"
181
    BUILD_MONITOR_HANDLER = 'ganeti.%s.event.progress' % prefix
182
    # email
183
    EMAIL_HANDLER = 'logic.%s.email.*' % prefix
184
    # reconciliation
185
    RECONC_HANDLER = 'reconciliation.%s.*' % prefix
186

    
187
    BINDINGS = [
188
    # Queue                   # Exchange                # RouteKey              # Handler
189
    (QUEUE_GANETI_EVENTS_OP,  settings.EXCHANGE_GANETI, DB_HANDLER_KEY_OP,      'update_db'),
190
    (QUEUE_GANETI_EVENTS_NET, settings.EXCHANGE_GANETI, DB_HANDLER_KEY_NET,     'update_net'),
191
    (QUEUE_GANETI_BUILD_PROGR,settings.EXCHANGE_GANETI, BUILD_MONITOR_HANDLER,  'update_build_progress'),
192
    (QUEUE_CRON_CREDITS,      settings.EXCHANGE_CRON,   '*.credits.*',          'update_credits'),
193
    (QUEUE_EMAIL,             settings.EXCHANGE_API,    EMAIL_HANDLER,          'send_email'),
194
    (QUEUE_EMAIL,             settings.EXCHANGE_CRON,   EMAIL_HANDLER,          'send_email'),
195
    (QUEUE_RECONC,            settings.EXCHANGE_CRON,   RECONC_HANDLER,         'trigger_status_update'),
196
    ]
197

    
198
    if settings.DEBUG is True:
199
        BINDINGS += [
200
            # Queue       # Exchange          # RouteKey  # Handler
201
            (QUEUE_DEBUG, settings.EXCHANGE_GANETI, '#',  'dummy_proc'),
202
            (QUEUE_DEBUG, settings.EXCHANGE_CRON,   '#',  'dummy_proc'),
203
            (QUEUE_DEBUG, settings.EXCHANGE_API,    '#',  'dummy_proc'),
204
        ]
205
        QUEUES += (QUEUE_DEBUG,)
206

    
207

    
208
def _exit_handler(signum, frame):
209
    """"Catch exit signal in children processes"""
210
    global logger
211
    logger.info("Caught signal %d, will raise SystemExit", signum)
212
    raise SystemExit
213

    
214

    
215
def _parent_handler(signum, frame):
216
    """"Catch exit signal in parent process and forward it to children."""
217
    global children, logger
218
    logger.info("Caught signal %d, sending SIGTERM to children %s",
219
                signum, children)
220
    [os.kill(pid, SIGTERM) for pid in children]
221

    
222

    
223
def child(cmdline):
224
    """The context of the child process"""
225

    
226
    # Cmd line argument parsing
227
    (opts, args) = parse_arguments(cmdline)
228
    disp = Dispatcher(debug=opts.debug)
229

    
230
    # Start the event loop
231
    disp.wait()
232

    
233

    
234
def parse_arguments(args):
235
    from optparse import OptionParser
236

    
237
    parser = OptionParser()
238
    parser.add_option("-d", "--debug", action="store_true", default=False,
239
                      dest="debug", help="Enable debug mode")
240
    parser.add_option("-w", "--workers", default=2, dest="workers",
241
                      help="Number of workers to spawn", type="int")
242
    parser.add_option("-p", '--pid-file', dest="pid_file",
243
                      default=os.path.join(os.getcwd(), "dispatcher.pid"),
244
                      help="Save PID to file (default:%s)" %
245
                           os.path.join(os.getcwd(), "dispatcher.pid"))
246
    parser.add_option("--purge-queues", action="store_true",
247
                      default=False, dest="purge_queues",
248
                      help="Remove all declared queues (DANGEROUS!)")
249
    parser.add_option("--purge-exchanges", action="store_true",
250
                      default=False, dest="purge_exchanges",
251
                      help="Remove all exchanges. Implies deleting all queues \
252
                           first (DANGEROUS!)")
253
    parser.add_option("--drain-queue", dest="drain_queue",
254
                      help="Strips a queue from all outstanding messages")
255

    
256
    return parser.parse_args(args)
257

    
258

    
259
def purge_queues():
260
    """
261
        Delete declared queues from RabbitMQ. Use with care!
262
    """
263
    global QUEUES, BINDINGS
264
    conn = get_connection()
265
    chan = conn.channel()
266

    
267
    print "Queues to be deleted: ", QUEUES
268

    
269
    if not get_user_confirmation():
270
        return
271

    
272
    for queue in QUEUES:
273
        try:
274
            chan.queue_delete(queue=queue)
275
            print "Deleting queue %s" % queue
276
        except amqp.exceptions.AMQPChannelException as e:
277
            print e.amqp_reply_code, " ", e.amqp_reply_text
278
            chan = conn.channel()
279

    
280
    chan.connection.close()
281

    
282

    
283
def purge_exchanges():
284
    """Delete declared exchanges from RabbitMQ, after removing all queues"""
285
    global QUEUES, BINDINGS
286
    purge_queues()
287

    
288
    conn = get_connection()
289
    chan = conn.channel()
290

    
291
    print "Exchanges to be deleted: ", settings.EXCHANGES
292

    
293
    if not get_user_confirmation():
294
        return
295

    
296
    for exchange in settings.EXCHANGES:
297
        try:
298
            chan.exchange_delete(exchange=exchange)
299
        except amqp.exceptions.AMQPChannelException as e:
300
            print e.amqp_reply_code, " ", e.amqp_reply_text
301

    
302
    chan.connection.close()
303

    
304

    
305
def drain_queue(queue):
306
    """Strip a (declared) queue from all outstanding messages"""
307
    global QUEUES, BINDINGS
308
    if not queue:
309
        return
310

    
311
    if not queue in QUEUES:
312
        print "Queue %s not configured" % queue
313
        return
314

    
315
    print "Queue to be drained: %s" % queue
316

    
317
    if not get_user_confirmation():
318
        return
319
    conn = get_connection()
320
    chan = conn.channel()
321

    
322
    # Register a temporary queue binding
323
    for binding in BINDINGS:
324
        if binding[0] == queue:
325
            exch = binding[1]
326

    
327
    if not exch:
328
        print "Queue not bound to any exchange: %s" % queue
329
        return
330

    
331
    chan.queue_bind(queue=queue, exchange=exch, routing_key='#')
332
    tag = chan.basic_consume(queue=queue, callback=callbacks.dummy_proc)
333

    
334
    print "Queue draining about to start, hit Ctrl+c when done"
335
    time.sleep(2)
336
    print "Queue draining starting"
337

    
338
    signal(SIGTERM, _exit_handler)
339
    signal(SIGINT, _exit_handler)
340

    
341
    num_processed = 0
342
    while True:
343
        chan.wait()
344
        num_processed += 1
345
        sys.stderr.write("Ignored %d messages\r" % num_processed)
346

    
347
    chan.basic_cancel(tag)
348
    chan.connection.close()
349

    
350

    
351
def get_connection():
352
    conn = amqp.Connection(host=settings.RABBIT_HOST,
353
                           userid=settings.RABBIT_USERNAME,
354
                           password=settings.RABBIT_PASSWORD,
355
                           virtual_host=settings.RABBIT_VHOST)
356
    return conn
357

    
358

    
359
def get_user_confirmation():
360
    ans = raw_input("Are you sure (N/y):")
361

    
362
    if not ans:
363
        return False
364
    if ans not in ['Y', 'y']:
365
        return False
366
    return True
367

    
368

    
369
def debug_mode():
370
    disp = Dispatcher(debug=True)
371
    signal(SIGINT, _exit_handler)
372
    signal(SIGTERM, _exit_handler)
373

    
374
    disp.wait()
375

    
376

    
377
def daemon_mode(opts):
378
    global children, logger
379

    
380
    # Create pidfile,
381
    # take care of differences between python-daemon versions
382
    try:
383
        pidf = pidfile.TimeoutPIDLockFile(opts.pid_file, 10)
384
    except:
385
        pidf = pidlockfile.TimeoutPIDLockFile(opts.pid_file, 10)
386

    
387
    pidf.acquire()
388

    
389
    logger.info("Became a daemon")
390

    
391
    # Fork workers
392
    children = []
393

    
394
    i = 0
395
    while i < opts.workers:
396
        newpid = os.fork()
397

    
398
        if newpid == 0:
399
            signal(SIGINT, _exit_handler)
400
            signal(SIGTERM, _exit_handler)
401
            child(sys.argv[1:])
402
            sys.exit(1)
403
        else:
404
            pids = (os.getpid(), newpid)
405
            logger.debug("%d, forked child: %d" % pids)
406
            children.append(pids[1])
407
        i += 1
408

    
409
    # Catch signals to ensure graceful shutdown
410
    signal(SIGINT, _parent_handler)
411
    signal(SIGTERM, _parent_handler)
412

    
413
    # Wait for all children processes to die, one by one
414
    try:
415
        for pid in children:
416
            try:
417
                os.waitpid(pid, 0)
418
            except Exception:
419
                pass
420
    finally:
421
        pidf.release()
422

    
423

    
424
def main():
425
    global logger
426
    (opts, args) = parse_arguments(sys.argv[1:])
427

    
428
    logger = log.get_logger("synnefo.dispatcher")
429

    
430
    # Init the global variables containing the queues
431
    _init_queues()
432

    
433
    # Special case for the clean up queues action
434
    if opts.purge_queues:
435
        purge_queues()
436
        return
437

    
438
    # Special case for the clean up exch action
439
    if opts.purge_exchanges:
440
        purge_exchanges()
441
        return
442

    
443
    if opts.drain_queue:
444
        drain_queue(opts.drain_queue)
445
        return
446

    
447
    # Debug mode, process messages without spawning workers
448
    if opts.debug:
449
        log.console_output(logger)
450
        debug_mode()
451
        return
452

    
453
    # Redirect stdout and stderr to the fileno of the first
454
    # file-based handler for this logger
455
    stdout_stderr_handler = None
456
    files_preserve = None
457
    for handler in logger.handlers:
458
        if hasattr(handler, 'stream') and hasattr(handler.stream, 'fileno'):
459
            stdout_stderr_handler = handler.stream
460
            files_preserve = [handler.stream]
461
            break
462

    
463
    daemon_context = daemon.DaemonContext(
464
        stdout=stdout_stderr_handler,
465
        stderr=stdout_stderr_handler,
466
        files_preserve=files_preserve,
467
        umask=022)
468

    
469
    daemon_context.open()
470

    
471
    # Catch every exception, make sure it gets logged properly
472
    try:
473
        daemon_mode(opts)
474
    except Exception:
475
        exc = "".join(traceback.format_exception(*sys.exc_info()))
476
        logger.critical(exc)
477
        raise
478

    
479

    
480
if __name__ == "__main__":
481
    sys.exit(main())
482

    
483
# vim: set sta sts=4 shiftwidth=4 sw=4 et ai :