Statistics
| Branch: | Tag: | Revision:

root / logic / dispatcher.py @ 2bf8d695

History | View | Annotate | Download (14.8 kB)

1
#!/usr/bin/env python
2
# Copyright 2011 GRNET S.A. All rights reserved.
3
#
4
# Redistribution and use in source and binary forms, with or without
5
# modification, are permitted provided that the following conditions
6
# are met:
7
#
8
#   1. Redistributions of source code must retain the above copyright
9
#      notice, this list of conditions and the following disclaimer.
10
#
11
#  2. Redistributions in binary form must reproduce the above copyright
12
#     notice, this list of conditions and the following disclaimer in the
13
#     documentation and/or other materials provided with the distribution.
14
#
15
# THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
16
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18
# ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
19
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
20
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
21
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
22
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
23
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
24
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
25
# SUCH DAMAGE.
26
#
27
# The views and conclusions contained in the software and documentation are
28
# those of the authors and should not be interpreted as representing official
29
# policies, either expressed or implied, of GRNET S.A.
30

    
31

    
32
""" Message queue setup, dispatch and admin
33

34
This program sets up connections to the queues configured in settings.py
35
and implements the message wait and dispatch loops. Actual messages are
36
handled in the dispatched functions.
37

38
"""
39
from django.core.management import setup_environ
40

    
41
import sys
42
import os
43
path = os.path.normpath(os.path.join(os.getcwd(), '..'))
44
sys.path.append(path)
45
import synnefo.settings as settings
46
from synnefo.logic import log
47

    
48
setup_environ(settings)
49

    
50
from amqplib import client_0_8 as amqp
51
from signal import signal, SIGINT, SIGTERM
52

    
53
import time
54
import socket
55
from daemon import daemon
56

    
57
import traceback
58

    
59
# Take care of differences between python-daemon versions.
60
try:
61
    from daemon import pidfile
62
except:
63
    from daemon import pidlockfile
64

    
65
from synnefo.logic import callbacks
66

    
67
# Queue names
68
QUEUES = []
69

    
70
# Queue bindings to exchanges
71
BINDINGS = []
72

    
73

    
74
class Dispatcher:
75

    
76
    logger = None
77
    chan = None
78
    debug = False
79
    clienttags = []
80

    
81
    def __init__(self, debug=False):
82

    
83
        # Initialize logger
84
        self.logger = log.get_logger('synnefo.dispatcher')
85

    
86
        self.debug = debug
87
        self._init()
88

    
89
    def wait(self):
90
        while True:
91
            try:
92
                self.chan.wait()
93
            except SystemExit:
94
                break
95
            except amqp.exceptions.AMQPConnectionException:
96
                self.logger.error("Server went away, reconnecting...")
97
                self._init()
98
            except socket.error:
99
                self.logger.error("Server went away, reconnecting...")
100
                self._init()
101
            except Exception, e:
102
                self.logger.exception("Caught unexpected exception")
103

    
104
        [self.chan.basic_cancel(clienttag) for clienttag in self.clienttags]
105
        self.chan.connection.close()
106
        self.chan.close()
107

    
108
    def _init(self):
109
        global QUEUES, BINDINGS
110
        self.logger.info("Initializing")
111

    
112
        # Connect to RabbitMQ
113
        conn = None
114
        while conn == None:
115
            self.logger.info("Attempting to connect to %s",
116
                             settings.RABBIT_HOST)
117
            try:
118
                conn = amqp.Connection(host=settings.RABBIT_HOST,
119
                                       userid=settings.RABBIT_USERNAME,
120
                                       password=settings.RABBIT_PASSWORD,
121
                                       virtual_host=settings.RABBIT_VHOST)
122
            except socket.error:
123
                self.logger.error("Failed to connect to %s, retrying in 10s",
124
                                  settings.RABBIT_HOST)
125
                time.sleep(10)
126

    
127
        self.logger.info("Connection succesful, opening channel")
128
        self.chan = conn.channel()
129

    
130
        # Declare queues and exchanges
131
        for exchange in settings.EXCHANGES:
132
            self.chan.exchange_declare(exchange=exchange, type="topic",
133
                                       durable=True, auto_delete=False)
134

    
135
        for queue in QUEUES:
136
            self.chan.queue_declare(queue=queue, durable=True,
137
                                    exclusive=False, auto_delete=False)
138

    
139
        bindings = BINDINGS
140

    
141
        # Bind queues to handler methods
142
        for binding in bindings:
143
            try:
144
                callback = getattr(callbacks, binding[3])
145
            except AttributeError:
146
                self.logger.error("Cannot find callback %s" % binding[3])
147
                raise SystemExit(1)
148

    
149
            self.chan.queue_bind(queue=binding[0], exchange=binding[1],
150
                                 routing_key=binding[2])
151
            tag = self.chan.basic_consume(queue=binding[0], callback=callback)
152
            self.logger.debug("Binding %s(%s) to queue %s with handler %s" %
153
                              (binding[1], binding[2], binding[0], binding[3]))
154
            self.clienttags.append(tag)
155

    
156

    
157
def _init_queues():
158
    global QUEUES, BINDINGS
159

    
160
    # Queue declarations
161
    prefix = settings.BACKEND_PREFIX_ID.split('-')[0]
162

    
163
    QUEUE_GANETI_EVENTS_OP = "%s-events-op" % prefix
164
    QUEUE_GANETI_EVENTS_NET = "%s-events-net" % prefix
165
    QUEUE_GANETI_BUILD_PROGR = "%s-events-progress" % prefix
166
    QUEUE_CRON_CREDITS = "%s-credits" % prefix
167
    QUEUE_EMAIL = "%s-email" % prefix
168
    QUEUE_RECONC = "%s-reconciliation" % prefix
169
    if settings.DEBUG is True:
170
        QUEUE_DEBUG = "debug"       # Debug queue, retrieves all messages
171

    
172
    QUEUES = (QUEUE_GANETI_EVENTS_OP, QUEUE_GANETI_EVENTS_NET,
173
              QUEUE_CRON_CREDITS, QUEUE_EMAIL, QUEUE_RECONC,
174
              QUEUE_GANETI_BUILD_PROGR)
175

    
176
    # notifications of type "ganeti-op-status"
177
    DB_HANDLER_KEY_OP = 'ganeti.%s.event.op' % prefix
178
    # notifications of type "ganeti-net-status"
179
    DB_HANDLER_KEY_NET = 'ganeti.%s.event.net' % prefix
180
    # notifications of type "ganeti-create-progress"
181
    BUILD_MONITOR_HANDLER = 'ganeti.%s.event.progress' % prefix
182

    
183
    BINDINGS = [
184
    # Queue                    # Exchange                # RouteKey
185
    # Handler
186
    (QUEUE_GANETI_EVENTS_OP,   settings.EXCHANGE_GANETI, DB_HANDLER_KEY_OP,
187
     'update_db'),
188
    (QUEUE_GANETI_EVENTS_NET,  settings.EXCHANGE_GANETI, DB_HANDLER_KEY_NET,
189
     'update_net'),
190
    (QUEUE_GANETI_BUILD_PROGR, settings.EXCHANGE_GANETI, BUILD_MONITOR_HANDLER,
191
     'update_build_progress'),
192
    (QUEUE_CRON_CREDITS,       settings.EXCHANGE_CRON,   '*.credits.*',
193
     'update_credits'),
194
    (QUEUE_EMAIL,              settings.EXCHANGE_API,    '*.email.*',
195
     'send_email'),
196
    (QUEUE_EMAIL,              settings.EXCHANGE_CRON,   '*.email.*',
197
     'send_email'),
198
    (QUEUE_RECONC,             settings.EXCHANGE_CRON,   'reconciliation.*',
199
     'trigger_status_update'),
200
    ]
201

    
202
    if settings.DEBUG is True:
203
        BINDINGS += [
204
            # Queue       # Exchange          # RouteKey  # Handler
205
            (QUEUE_DEBUG, settings.EXCHANGE_GANETI, '#',  'dummy_proc'),
206
            (QUEUE_DEBUG, settings.EXCHANGE_CRON,   '#',  'dummy_proc'),
207
            (QUEUE_DEBUG, settings.EXCHANGE_API,    '#',  'dummy_proc'),
208
        ]
209
        QUEUES += (QUEUE_DEBUG,)
210

    
211

    
212
def _exit_handler(signum, frame):
213
    """"Catch exit signal in children processes"""
214
    global logger
215
    logger.info("Caught signal %d, will raise SystemExit", signum)
216
    raise SystemExit
217

    
218

    
219
def _parent_handler(signum, frame):
220
    """"Catch exit signal in parent process and forward it to children."""
221
    global children, logger
222
    logger.info("Caught signal %d, sending SIGTERM to children %s",
223
                signum, children)
224
    [os.kill(pid, SIGTERM) for pid in children]
225

    
226

    
227
def child(cmdline):
228
    """The context of the child process"""
229

    
230
    # Cmd line argument parsing
231
    (opts, args) = parse_arguments(cmdline)
232
    disp = Dispatcher(debug=opts.debug)
233

    
234
    # Start the event loop
235
    disp.wait()
236

    
237

    
238
def parse_arguments(args):
239
    from optparse import OptionParser
240

    
241
    parser = OptionParser()
242
    parser.add_option("-d", "--debug", action="store_true", default=False,
243
                      dest="debug", help="Enable debug mode")
244
    parser.add_option("-w", "--workers", default=2, dest="workers",
245
                      help="Number of workers to spawn", type="int")
246
    parser.add_option("-p", '--pid-file', dest="pid_file",
247
                      default=os.path.join(os.getcwd(), "dispatcher.pid"),
248
                      help="Save PID to file (default:%s)" %
249
                           os.path.join(os.getcwd(), "dispatcher.pid"))
250
    parser.add_option("--purge-queues", action="store_true",
251
                      default=False, dest="purge_queues",
252
                      help="Remove all declared queues (DANGEROUS!)")
253
    parser.add_option("--purge-exchanges", action="store_true",
254
                      default=False, dest="purge_exchanges",
255
                      help="Remove all exchanges. Implies deleting all queues \
256
                           first (DANGEROUS!)")
257
    parser.add_option("--drain-queue", dest="drain_queue",
258
                      help="Strips a queue from all outstanding messages")
259

    
260
    return parser.parse_args(args)
261

    
262

    
263
def purge_queues():
264
    """
265
        Delete declared queues from RabbitMQ. Use with care!
266
    """
267
    global QUEUES, BINDINGS
268
    conn = get_connection()
269
    chan = conn.channel()
270

    
271
    print "Queues to be deleted: ", QUEUES
272

    
273
    if not get_user_confirmation():
274
        return
275

    
276
    for queue in QUEUES:
277
        try:
278
            chan.queue_delete(queue=queue)
279
            print "Deleting queue %s" % queue
280
        except amqp.exceptions.AMQPChannelException as e:
281
            print e.amqp_reply_code, " ", e.amqp_reply_text
282
            chan = conn.channel()
283

    
284
    chan.connection.close()
285

    
286

    
287
def purge_exchanges():
288
    """Delete declared exchanges from RabbitMQ, after removing all queues"""
289
    global QUEUES, BINDINGS
290
    purge_queues()
291

    
292
    conn = get_connection()
293
    chan = conn.channel()
294

    
295
    print "Exchanges to be deleted: ", settings.EXCHANGES
296

    
297
    if not get_user_confirmation():
298
        return
299

    
300
    for exchange in settings.EXCHANGES:
301
        try:
302
            chan.exchange_delete(exchange=exchange)
303
        except amqp.exceptions.AMQPChannelException as e:
304
            print e.amqp_reply_code, " ", e.amqp_reply_text
305

    
306
    chan.connection.close()
307

    
308

    
309
def drain_queue(queue):
310
    """Strip a (declared) queue from all outstanding messages"""
311
    global QUEUES, BINDINGS
312
    if not queue:
313
        return
314

    
315
    if not queue in QUEUES:
316
        print "Queue %s not configured" % queue
317
        return
318

    
319
    print "Queue to be drained: %s" % queue
320

    
321
    if not get_user_confirmation():
322
        return
323
    conn = get_connection()
324
    chan = conn.channel()
325

    
326
    # Register a temporary queue binding
327
    for binding in BINDINGS:
328
        if binding[0] == queue:
329
            exch = binding[1]
330

    
331
    if not exch:
332
        print "Queue not bound to any exchange: %s" % queue
333
        return
334

    
335
    chan.queue_bind(queue=queue, exchange=exch, routing_key='#')
336
    tag = chan.basic_consume(queue=queue, callback=callbacks.dummy_proc)
337

    
338
    print "Queue draining about to start, hit Ctrl+c when done"
339
    time.sleep(2)
340
    print "Queue draining starting"
341

    
342
    signal(SIGTERM, _exit_handler)
343
    signal(SIGINT, _exit_handler)
344

    
345
    num_processed = 0
346
    while True:
347
        chan.wait()
348
        num_processed += 1
349
        sys.stderr.write("Ignored %d messages\r" % num_processed)
350

    
351
    chan.basic_cancel(tag)
352
    chan.connection.close()
353

    
354

    
355
def get_connection():
356
    conn = amqp.Connection(host=settings.RABBIT_HOST,
357
                           userid=settings.RABBIT_USERNAME,
358
                           password=settings.RABBIT_PASSWORD,
359
                           virtual_host=settings.RABBIT_VHOST)
360
    return conn
361

    
362

    
363
def get_user_confirmation():
364
    ans = raw_input("Are you sure (N/y):")
365

    
366
    if not ans:
367
        return False
368
    if ans not in ['Y', 'y']:
369
        return False
370
    return True
371

    
372

    
373
def debug_mode():
374
    disp = Dispatcher(debug=True)
375
    signal(SIGINT, _exit_handler)
376
    signal(SIGTERM, _exit_handler)
377

    
378
    disp.wait()
379

    
380

    
381
def daemon_mode(opts):
382
    global children, logger
383

    
384
    # Create pidfile,
385
    # take care of differences between python-daemon versions
386
    try:
387
        pidf = pidfile.TimeoutPIDLockFile(opts.pid_file, 10)
388
    except:
389
        pidf = pidlockfile.TimeoutPIDLockFile(opts.pid_file, 10)
390

    
391
    pidf.acquire()
392

    
393
    logger.info("Became a daemon")
394

    
395
    # Fork workers
396
    children = []
397

    
398
    i = 0
399
    while i < opts.workers:
400
        newpid = os.fork()
401

    
402
        if newpid == 0:
403
            signal(SIGINT, _exit_handler)
404
            signal(SIGTERM, _exit_handler)
405
            child(sys.argv[1:])
406
            sys.exit(1)
407
        else:
408
            pids = (os.getpid(), newpid)
409
            logger.debug("%d, forked child: %d" % pids)
410
            children.append(pids[1])
411
        i += 1
412

    
413
    # Catch signals to ensure graceful shutdown
414
    signal(SIGINT, _parent_handler)
415
    signal(SIGTERM, _parent_handler)
416

    
417
    # Wait for all children processes to die, one by one
418
    try:
419
        for pid in children:
420
            try:
421
                os.waitpid(pid, 0)
422
            except Exception:
423
                pass
424
    finally:
425
        pidf.release()
426

    
427

    
428
def main():
429
    global logger
430
    (opts, args) = parse_arguments(sys.argv[1:])
431

    
432
    logger = log.get_logger("synnefo.dispatcher")
433

    
434
    # Init the global variables containing the queues
435
    _init_queues()
436

    
437
    # Special case for the clean up queues action
438
    if opts.purge_queues:
439
        purge_queues()
440
        return
441

    
442
    # Special case for the clean up exch action
443
    if opts.purge_exchanges:
444
        purge_exchanges()
445
        return
446

    
447
    if opts.drain_queue:
448
        drain_queue(opts.drain_queue)
449
        return
450

    
451
    # Debug mode, process messages without spawning workers
452
    if opts.debug:
453
        log.console_output(logger)
454
        debug_mode()
455
        return
456

    
457
    # Redirect stdout and stderr to the fileno of the first
458
    # file-based handler for this logger
459
    stdout_stderr_handler = None
460
    files_preserve = None
461
    for handler in logger.handlers:
462
        if hasattr(handler, 'stream') and hasattr(handler.stream, 'fileno'):
463
            stdout_stderr_handler = handler.stream
464
            files_preserve = [handler.stream]
465
            break
466

    
467
    daemon_context = daemon.DaemonContext(
468
        stdout=stdout_stderr_handler,
469
        stderr=stdout_stderr_handler,
470
        files_preserve=files_preserve,
471
        umask=022)
472

    
473
    daemon_context.open()
474

    
475
    # Catch every exception, make sure it gets logged properly
476
    try:
477
        daemon_mode(opts)
478
    except Exception:
479
        exc = "".join(traceback.format_exception(*sys.exc_info()))
480
        logger.critical(exc)
481
        raise
482

    
483

    
484
if __name__ == "__main__":
485
    sys.exit(main())
486

    
487
# vim: set sta sts=4 shiftwidth=4 sw=4 et ai :