Statistics
| Branch: | Tag: | Revision:

root / logic / dispatcher.py @ d28244af

History | View | Annotate | Download (14.9 kB)

1
#!/usr/bin/env python
2
# Copyright 2011 GRNET S.A. All rights reserved.
3
#
4
# Redistribution and use in source and binary forms, with or without
5
# modification, are permitted provided that the following conditions
6
# are met:
7
#
8
#   1. Redistributions of source code must retain the above copyright
9
#      notice, this list of conditions and the following disclaimer.
10
#
11
#  2. Redistributions in binary form must reproduce the above copyright
12
#     notice, this list of conditions and the following disclaimer in the
13
#     documentation and/or other materials provided with the distribution.
14
#
15
# THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
16
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18
# ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
19
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
20
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
21
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
22
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
23
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
24
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
25
# SUCH DAMAGE.
26
#
27
# The views and conclusions contained in the software and documentation are
28
# those of the authors and should not be interpreted as representing official
29
# policies, either expressed or implied, of GRNET S.A.
30

    
31

    
32
""" Message queue setup, dispatch and admin
33

34
This program sets up connections to the queues configured in settings.py
35
and implements the message wait and dispatch loops. Actual messages are
36
handled in the dispatched functions.
37

38
"""
39
from django.core.management import setup_environ
40

    
41
import sys
42
import os
43
path = os.path.normpath(os.path.join(os.getcwd(), '..'))
44
sys.path.append(path)
45
import synnefo.settings as settings
46
from synnefo.logic import log
47

    
48
setup_environ(settings)
49

    
50
from amqplib import client_0_8 as amqp
51
from signal import signal, SIGINT, SIGTERM
52

    
53
import time
54
import socket
55
from daemon import daemon
56

    
57
import traceback
58

    
59
# Take care of differences between python-daemon versions.
60
try:
61
    from daemon import pidfile
62
except:
63
    from daemon import pidlockfile
64

    
65
from synnefo.logic import callbacks
66

    
67
# Queue names
68
QUEUES = []
69

    
70
# Queue bindings to exchanges
71
BINDINGS = []
72

    
73
class Dispatcher:
74

    
75
    logger = None
76
    chan = None
77
    debug = False
78
    clienttags = []
79

    
80
    def __init__(self, debug = False):
81

    
82
        # Initialize logger
83
        self.logger = log.get_logger('synnefo.dispatcher')
84

    
85
        self.debug = debug
86
        self._init()
87

    
88
    def wait(self):
89
        while True:
90
            try:
91
                self.chan.wait()
92
            except SystemExit:
93
                break
94
            except amqp.exceptions.AMQPConnectionException:
95
                self.logger.error("Server went away, reconnecting...")
96
                self._init()
97
            except socket.error:
98
                self.logger.error("Server went away, reconnecting...")
99
                self._init()
100
            except Exception, e:
101
                self.logger.exception("Caught unexpected exception")
102

    
103
        [self.chan.basic_cancel(clienttag) for clienttag in self.clienttags]
104
        self.chan.connection.close()
105
        self.chan.close()
106

    
107
    def _init(self):
108
        global QUEUES, BINDINGS
109
        self.logger.info("Initializing")
110

    
111
        # Connect to RabbitMQ
112
        conn = None
113
        while conn == None:
114
            self.logger.info("Attempting to connect to %s",
115
                             settings.RABBIT_HOST)
116
            try:
117
                conn = amqp.Connection(host=settings.RABBIT_HOST,
118
                                       userid=settings.RABBIT_USERNAME,
119
                                       password=settings.RABBIT_PASSWORD,
120
                                       virtual_host=settings.RABBIT_VHOST)
121
            except socket.error:
122
                self.logger.error("Failed to connect to %s, retrying in 10s...",
123
                                  settings.RABBIT_HOST)
124
                time.sleep(10)
125

    
126
        self.logger.info("Connection succesful, opening channel")
127
        self.chan = conn.channel()
128

    
129
        # Declare queues and exchanges
130
        for exchange in settings.EXCHANGES:
131
            self.chan.exchange_declare(exchange=exchange, type="topic",
132
                                       durable=True, auto_delete=False)
133

    
134
        for queue in QUEUES:
135
            self.chan.queue_declare(queue=queue, durable=True,
136
                                    exclusive=False, auto_delete=False)
137

    
138
        bindings = BINDINGS
139

    
140
        # Bind queues to handler methods
141
        for binding in bindings:
142
            try:
143
                callback = getattr(callbacks, binding[3])
144
            except AttributeError:
145
                self.logger.error("Cannot find callback %s" % binding[3])
146
                raise SystemExit(1)
147

    
148
            self.chan.queue_bind(queue=binding[0], exchange=binding[1],
149
                                 routing_key=binding[2])
150
            tag = self.chan.basic_consume(queue=binding[0], callback=callback)
151
            self.logger.debug("Binding %s(%s) to queue %s with handler %s" %
152
                              (binding[1], binding[2], binding[0], binding[3]))
153
            self.clienttags.append(tag)
154

    
155

    
156
def _init_queues():
157
    global QUEUES, BINDINGS
158

    
159
    # Queue declarations
160
    prefix = settings.BACKEND_PREFIX_ID.split('-')[0]
161

    
162
    QUEUE_GANETI_EVENTS_OP = "%s-events-op" % prefix
163
    QUEUE_GANETI_EVENTS_NET = "%s-events-net" % prefix
164
    QUEUE_GANETI_BUILD_PROGR = "%s-events-progress" % prefix
165
    QUEUE_CRON_CREDITS = "%s-credits" % prefix
166
    QUEUE_EMAIL = "%s-email" % prefix
167
    QUEUE_RECONC = "%s-reconciliation" % prefix
168
    if settings.DEBUG is True:
169
        QUEUE_DEBUG = "debug"       # Debug queue, retrieves all messages
170

    
171
    QUEUES = (QUEUE_GANETI_EVENTS_OP, QUEUE_GANETI_EVENTS_NET,
172
              QUEUE_CRON_CREDITS, QUEUE_EMAIL, QUEUE_RECONC,
173
              QUEUE_GANETI_BUILD_PROGR)
174

    
175
    # notifications of type "ganeti-op-status"
176
    DB_HANDLER_KEY_OP ='ganeti.%s.event.op' % prefix
177
    # notifications of type "ganeti-net-status"
178
    DB_HANDLER_KEY_NET ='ganeti.%s.event.net' % prefix
179
    # notifications of type "ganeti-create-progress"
180
    BUILD_MONITOR_HANDLER = 'ganeti.%s.event.progress' % prefix
181

    
182
    BINDINGS = [
183
    # Queue                   # Exchange                # RouteKey              # Handler
184
    (QUEUE_GANETI_EVENTS_OP,  settings.EXCHANGE_GANETI, DB_HANDLER_KEY_OP,      'update_db'),
185
    (QUEUE_GANETI_EVENTS_NET, settings.EXCHANGE_GANETI, DB_HANDLER_KEY_NET,     'update_net'),
186
    (QUEUE_GANETI_BUILD_PROGR,settings.EXCHANGE_GANETI, BUILD_MONITOR_HANDLER,  'update_build_progress'),
187
    (QUEUE_CRON_CREDITS,      settings.EXCHANGE_CRON,   '*.credits.*',          'update_credits'),
188
    (QUEUE_EMAIL,             settings.EXCHANGE_API,    '*.email.*',            'send_email'),
189
    (QUEUE_EMAIL,             settings.EXCHANGE_CRON,   '*.email.*',            'send_email'),
190
    (QUEUE_RECONC,            settings.EXCHANGE_CRON,   'reconciliation.*',     'trigger_status_update'),
191
    ]
192

    
193
    if settings.DEBUG is True:
194
        BINDINGS += [
195
            # Queue       # Exchange          # RouteKey  # Handler
196
            (QUEUE_DEBUG, settings.EXCHANGE_GANETI, '#',  'dummy_proc'),
197
            (QUEUE_DEBUG, settings.EXCHANGE_CRON,   '#',  'dummy_proc'),
198
            (QUEUE_DEBUG, settings.EXCHANGE_API,    '#',  'dummy_proc'),
199
        ]
200
        QUEUES += (QUEUE_DEBUG,)
201

    
202

    
203
def _exit_handler(signum, frame):
204
    """"Catch exit signal in children processes"""
205
    global logger
206
    logger.info("Caught signal %d, will raise SystemExit", signum)
207
    raise SystemExit
208

    
209

    
210
def _parent_handler(signum, frame):
211
    """"Catch exit signal in parent process and forward it to children."""
212
    global children, logger
213
    logger.info("Caught signal %d, sending SIGTERM to children %s",
214
                signum, children)
215
    [os.kill(pid, SIGTERM) for pid in children]
216

    
217

    
218
def child(cmdline):
219
    """The context of the child process"""
220

    
221
    # Cmd line argument parsing
222
    (opts, args) = parse_arguments(cmdline)
223
    disp = Dispatcher(debug=opts.debug)
224

    
225
    # Start the event loop
226
    disp.wait()
227

    
228

    
229
def parse_arguments(args):
230
    from optparse import OptionParser
231

    
232
    parser = OptionParser()
233
    parser.add_option("-d", "--debug", action="store_true", default=False,
234
                      dest="debug", help="Enable debug mode")
235
    parser.add_option("-w", "--workers", default=2, dest="workers",
236
                      help="Number of workers to spawn", type="int")
237
    parser.add_option("-p", '--pid-file', dest="pid_file",
238
                      default=os.path.join(os.getcwd(), "dispatcher.pid"),
239
                      help="Save PID to file (default:%s)" %
240
                           os.path.join(os.getcwd(), "dispatcher.pid"))
241
    parser.add_option("--purge-queues", action="store_true",
242
                      default=False, dest="purge_queues",
243
                      help="Remove all declared queues (DANGEROUS!)")
244
    parser.add_option("--purge-exchanges", action="store_true",
245
                      default=False, dest="purge_exchanges",
246
                      help="Remove all exchanges. Implies deleting all queues \
247
                           first (DANGEROUS!)")
248
    parser.add_option("--drain-queue", dest="drain_queue",
249
                      help="Strips a queue from all outstanding messages")
250

    
251
    return parser.parse_args(args)
252

    
253

    
254
def purge_queues():
255
    """
256
        Delete declared queues from RabbitMQ. Use with care!
257
    """
258
    global QUEUES, BINDINGS
259
    conn = get_connection()
260
    chan = conn.channel()
261

    
262
    print "Queues to be deleted: ", QUEUES
263

    
264
    if not get_user_confirmation():
265
        return
266

    
267
    for queue in QUEUES:
268
        try:
269
            chan.queue_delete(queue=queue)
270
            print "Deleting queue %s" % queue
271
        except amqp.exceptions.AMQPChannelException as e:
272
            print e.amqp_reply_code, " ", e.amqp_reply_text
273
            chan = conn.channel()
274

    
275
    chan.connection.close()
276

    
277

    
278
def purge_exchanges():
279
    """Delete declared exchanges from RabbitMQ, after removing all queues"""
280
    global QUEUES, BINDINGS
281
    purge_queues()
282

    
283
    conn = get_connection()
284
    chan = conn.channel()
285

    
286
    print "Exchanges to be deleted: ", settings.EXCHANGES
287

    
288
    if not get_user_confirmation():
289
        return
290

    
291
    for exchange in settings.EXCHANGES:
292
        try:
293
            chan.exchange_delete(exchange=exchange)
294
        except amqp.exceptions.AMQPChannelException as e:
295
            print e.amqp_reply_code, " ", e.amqp_reply_text
296

    
297
    chan.connection.close()
298

    
299

    
300
def drain_queue(queue):
301
    """Strip a (declared) queue from all outstanding messages"""
302
    global QUEUES, BINDINGS
303
    if not queue:
304
        return
305

    
306
    if not queue in QUEUES:
307
        print "Queue %s not configured" % queue
308
        return
309

    
310
    print "Queue to be drained: %s" % queue
311

    
312
    if not get_user_confirmation():
313
        return
314
    conn = get_connection()
315
    chan = conn.channel()
316

    
317
    # Register a temporary queue binding
318
    for binding in BINDINGS:
319
        if binding[0] == queue:
320
            exch = binding[1]
321

    
322
    if not exch:
323
        print "Queue not bound to any exchange: %s" % queue
324
        return
325

    
326
    chan.queue_bind(queue=queue, exchange=exch,routing_key='#')
327
    tag = chan.basic_consume(queue=queue, callback=callbacks.dummy_proc)
328

    
329
    print "Queue draining about to start, hit Ctrl+c when done"
330
    time.sleep(2)
331
    print "Queue draining starting"
332

    
333
    signal(SIGTERM, _exit_handler)
334
    signal(SIGINT, _exit_handler)
335

    
336
    num_processed = 0
337
    while True:
338
        chan.wait()
339
        num_processed += 1
340
        sys.stderr.write("Ignored %d messages\r" % num_processed)
341

    
342
    chan.basic_cancel(tag)
343
    chan.connection.close()
344

    
345

    
346
def get_connection():
347
    conn = amqp.Connection(host=settings.RABBIT_HOST,
348
                           userid=settings.RABBIT_USERNAME,
349
                           password=settings.RABBIT_PASSWORD,
350
                           virtual_host=settings.RABBIT_VHOST)
351
    return conn
352

    
353

    
354
def get_user_confirmation():
355
    ans = raw_input("Are you sure (N/y):")
356

    
357
    if not ans:
358
        return False
359
    if ans not in ['Y', 'y']:
360
        return False
361
    return True
362

    
363

    
364
def debug_mode():
365
    disp = Dispatcher(debug = True)
366
    signal(SIGINT, _exit_handler)
367
    signal(SIGTERM, _exit_handler)
368

    
369
    disp.wait()
370

    
371

    
372
def daemon_mode(opts):
373
    global children, logger
374

    
375
    # Create pidfile,
376
    # take care of differences between python-daemon versions
377
    try:
378
        pidf = pidfile.TimeoutPIDLockFile(opts.pid_file, 10)
379
    except:
380
        pidf = pidlockfile.TimeoutPIDLockFile(opts.pid_file, 10)
381

    
382
    pidf.acquire()
383

    
384
    logger.info("Became a daemon")
385

    
386
    # Fork workers
387
    children = []
388

    
389
    i = 0
390
    while i < opts.workers:
391
        newpid = os.fork()
392

    
393
        if newpid == 0:
394
            signal(SIGINT, _exit_handler)
395
            signal(SIGTERM, _exit_handler)
396
            child(sys.argv[1:])
397
            sys.exit(1)
398
        else:
399
            pids = (os.getpid(), newpid)
400
            logger.debug("%d, forked child: %d" % pids)
401
            children.append(pids[1])
402
        i += 1
403

    
404
    # Catch signals to ensure graceful shutdown
405
    signal(SIGINT, _parent_handler)
406
    signal(SIGTERM, _parent_handler)
407

    
408
    # Wait for all children processes to die, one by one
409
    try:
410
        for pid in children:
411
            try:
412
                os.waitpid(pid, 0)
413
            except Exception:
414
                pass
415
    finally:
416
        pidf.release()
417

    
418

    
419
def main():
420
    global logger
421
    (opts, args) = parse_arguments(sys.argv[1:])
422

    
423
    logger = log.get_logger("synnefo.dispatcher")
424

    
425
    # Init the global variables containing the queues
426
    _init_queues()
427

    
428
    # Special case for the clean up queues action
429
    if opts.purge_queues:
430
        purge_queues()
431
        return
432

    
433
    # Special case for the clean up exch action
434
    if opts.purge_exchanges:
435
        purge_exchanges()
436
        return
437

    
438
    if opts.drain_queue:
439
        drain_queue(opts.drain_queue)
440
        return
441

    
442
    # Debug mode, process messages without spawning workers
443
    if opts.debug:
444
        log.console_output(logger)
445
        debug_mode()
446
        return
447

    
448
    # Redirect stdout and stderr to the fileno of the first
449
    # file-based handler for this logger
450
    stdout_stderr_handler = None
451
    files_preserve = None
452
    for handler in logger.handlers:
453
        if hasattr(handler, 'stream') and hasattr(handler.stream, 'fileno'):
454
            stdout_stderr_handler = handler.stream
455
            files_preserve = [handler.stream]
456
            break
457

    
458
    daemon_context = daemon.DaemonContext(
459
        stdout=stdout_stderr_handler,
460
        stderr=stdout_stderr_handler,
461
        files_preserve=files_preserve,
462
        umask=022)
463

    
464
    daemon_context.open()
465

    
466
    # Catch every exception, make sure it gets logged properly
467
    try:
468
        daemon_mode(opts)
469
    except Exception:
470
        exc = "".join(traceback.format_exception(*sys.exc_info()))
471
        logger.critical(exc)
472
        raise
473

    
474

    
475
if __name__ == "__main__":
476
    sys.exit(main())
477

    
478
# vim: set sta sts=4 shiftwidth=4 sw=4 et ai :