Statistics
| Branch: | Tag: | Revision:

root / logic / dispatcher.py @ ad74e62d

History | View | Annotate | Download (14 kB)

1
#!/usr/bin/env python
2
# Copyright 2011 GRNET S.A. All rights reserved.
3
#
4
# Redistribution and use in source and binary forms, with or without
5
# modification, are permitted provided that the following conditions
6
# are met:
7
#
8
#   1. Redistributions of source code must retain the above copyright
9
#      notice, this list of conditions and the following disclaimer.
10
#
11
#  2. Redistributions in binary form must reproduce the above copyright
12
#     notice, this list of conditions and the following disclaimer in the
13
#     documentation and/or other materials provided with the distribution.
14
#
15
# THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
16
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18
# ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
19
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
20
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
21
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
22
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
23
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
24
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
25
# SUCH DAMAGE.
26
#
27
# The views and conclusions contained in the software and documentation are
28
# those of the authors and should not be interpreted as representing official
29
# policies, either expressed or implied, of GRNET S.A.
30

    
31

    
32
""" Message queue setup, dispatch and admin
33

34
This program sets up connections to the queues configured in settings.py
35
and implements the message wait and dispatch loops. Actual messages are
36
handled in the dispatched functions.
37

38
"""
39
from django.core.management import setup_environ
40

    
41
import sys
42
import os
43
path = os.path.normpath(os.path.join(os.getcwd(), '..'))
44
sys.path.append(path)
45
import synnefo.settings as settings
46
from synnefo.logic import log
47

    
48
setup_environ(settings)
49

    
50
from amqplib import client_0_8 as amqp
51
from signal import signal, SIGINT, SIGTERM
52

    
53
import time
54
import socket
55
from daemon import daemon
56

    
57
# Take care of differences between python-daemon versions.
58
try:
59
    from daemon import pidfile
60
except:
61
    from daemon import pidlockfile
62

    
63
from synnefo.logic import callbacks
64

    
65
# Queue names
66
QUEUES = []
67

    
68
# Queue bindings to exchanges
69
BINDINGS = []
70

    
71
class Dispatcher:
72

    
73
    logger = None
74
    chan = None
75
    debug = False
76
    clienttags = []
77

    
78
    def __init__(self, debug = False):
79
        
80
        # Initialize logger
81
        self.logger = log.get_logger('synnefo.dispatcher')
82

    
83
        self.debug = debug
84
        self._init()
85

    
86
    def wait(self):
87
        while True:
88
            try:
89
                self.chan.wait()
90
            except SystemExit:
91
                break
92
            except amqp.exceptions.AMQPConnectionException:
93
                self.logger.error("Server went away, reconnecting...")
94
                self._init()
95
            except socket.error:
96
                self.logger.error("Server went away, reconnecting...")
97
                self._init()
98
            except Exception, e:
99
                self.logger.exception("Caught unexpected exception")
100

    
101
        [self.chan.basic_cancel(clienttag) for clienttag in self.clienttags]
102
        self.chan.connection.close()
103
        self.chan.close()
104

    
105
    def _init(self):
106
        global QUEUES, BINDINGS
107
        self.logger.info("Initializing")
108

    
109
        # Connect to RabbitMQ
110
        conn = None
111
        while conn == None:
112
            self.logger.info("Attempting to connect to %s",
113
                             settings.RABBIT_HOST)
114
            try:
115
                conn = amqp.Connection(host=settings.RABBIT_HOST,
116
                                       userid=settings.RABBIT_USERNAME,
117
                                       password=settings.RABBIT_PASSWORD,
118
                                       virtual_host=settings.RABBIT_VHOST)
119
            except socket.error:
120
                time.sleep(1)
121

    
122
        self.logger.info("Connection succesful, opening channel")
123
        self.chan = conn.channel()
124

    
125
        # Declare queues and exchanges
126
        for exchange in settings.EXCHANGES:
127
            self.chan.exchange_declare(exchange=exchange, type="topic",
128
                                       durable=True, auto_delete=False)
129

    
130
        for queue in QUEUES:
131
            self.chan.queue_declare(queue=queue, durable=True,
132
                                    exclusive=False, auto_delete=False)
133

    
134
        bindings = BINDINGS
135

    
136
        # Bind queues to handler methods
137
        for binding in bindings:
138
            try:
139
                callback = getattr(callbacks, binding[3])
140
            except AttributeError:
141
                self.logger.error("Cannot find callback %s" % binding[3])
142
                raise SystemExit(1)
143

    
144
            self.chan.queue_bind(queue=binding[0], exchange=binding[1],
145
                                 routing_key=binding[2])
146
            tag = self.chan.basic_consume(queue=binding[0], callback=callback)
147
            self.logger.debug("Binding %s(%s) to queue %s with handler %s" %
148
                              (binding[1], binding[2], binding[0], binding[3]))
149
            self.clienttags.append(tag)
150

    
151

    
152
def _init_queues():
153
    global QUEUES, BINDINGS
154

    
155
    # Queue declarations
156
    prefix = settings.BACKEND_PREFIX_ID.split('-')[0]
157

    
158
    QUEUE_GANETI_EVENTS_OP = "%s-events-op" % prefix
159
    QUEUE_GANETI_EVENTS_NET = "%s-events-net" % prefix
160
    QUEUE_GANETI_BUILD_PROGR = "%s-events-progress" % prefix
161
    QUEUE_CRON_CREDITS = "%s-credits" % prefix
162
    QUEUE_EMAIL = "%s-email" % prefix
163
    QUEUE_RECONC = "%s-reconciliation" % prefix
164
    if settings.DEBUG is True:
165
        QUEUE_DEBUG = "debug"       # Debug queue, retrieves all messages
166

    
167
    QUEUES = (QUEUE_GANETI_EVENTS_OP, QUEUE_GANETI_EVENTS_NET,
168
              QUEUE_CRON_CREDITS, QUEUE_EMAIL, QUEUE_RECONC,
169
              QUEUE_GANETI_BUILD_PROGR)
170

    
171
    # notifications of type "ganeti-op-status"
172
    DB_HANDLER_KEY_OP ='ganeti.%s.event.op' % prefix
173
    # notifications of type "ganeti-net-status"
174
    DB_HANDLER_KEY_NET ='ganeti.%s.event.net' % prefix
175
    # notifications of type "ganeti-create-progress"
176
    BUILD_MONITOR_HANDLER = 'ganeti.%s.event.progress' % prefix
177

    
178
    BINDINGS = [
179
    # Queue                   # Exchange                # RouteKey              # Handler
180
    (QUEUE_GANETI_EVENTS_OP,  settings.EXCHANGE_GANETI, DB_HANDLER_KEY_OP,      'update_db'),
181
    (QUEUE_GANETI_EVENTS_NET, settings.EXCHANGE_GANETI, DB_HANDLER_KEY_NET,     'update_net'),
182
    (QUEUE_GANETI_BUILD_PROGR,settings.EXCHANGE_GANETI, BUILD_MONITOR_HANDLER,  'update_build_progress'),
183
    (QUEUE_CRON_CREDITS,      settings.EXCHANGE_CRON,   '*.credits.*',          'update_credits'),
184
    (QUEUE_EMAIL,             settings.EXCHANGE_API,    '*.email.*',            'send_email'),
185
    (QUEUE_EMAIL,             settings.EXCHANGE_CRON,   '*.email.*',            'send_email'),
186
    (QUEUE_RECONC,            settings.EXCHANGE_CRON,   'reconciliation.*',     'trigger_status_update'),
187
    ]
188

    
189
    if settings.DEBUG is True:
190
        BINDINGS += [
191
            # Queue       # Exchange          # RouteKey  # Handler
192
            (QUEUE_DEBUG, settings.EXCHANGE_GANETI, '#',  'dummy_proc'),
193
            (QUEUE_DEBUG, settings.EXCHANGE_CRON,   '#',  'dummy_proc'),
194
            (QUEUE_DEBUG, settings.EXCHANGE_API,    '#',  'dummy_proc'),
195
        ]
196
        QUEUES += (QUEUE_DEBUG,)
197

    
198

    
199
def _exit_handler(signum, frame):
200
    """"Catch exit signal in children processes."""
201
    print "%d: Caught signal %d, will raise SystemExit" % (os.getpid(), signum)
202
    raise SystemExit
203

    
204

    
205
def _parent_handler(signum, frame):
206
    """"Catch exit signal in parent process and forward it to children."""
207
    global children
208
    print "Caught signal %d, sending kill signal to children" % signum
209
    [os.kill(pid, SIGTERM) for pid in children]
210

    
211

    
212
def child(cmdline):
213
    """The context of the child process"""
214

    
215
    # Cmd line argument parsing
216
    (opts, args) = parse_arguments(cmdline)
217
    disp = Dispatcher(debug = opts.debug)
218

    
219
    # Start the event loop
220
    disp.wait()
221

    
222

    
223
def parse_arguments(args):
224
    from optparse import OptionParser
225

    
226
    parser = OptionParser()
227
    parser.add_option("-d", "--debug", action="store_true", default=False,
228
                      dest="debug", help="Enable debug mode")
229
    parser.add_option("-w", "--workers", default=2, dest="workers",
230
                      help="Number of workers to spawn", type="int")
231
    parser.add_option("-p", '--pid-file', dest="pid_file",
232
                      default=os.path.join(os.getcwd(), "dispatcher.pid"),
233
                      help="Save PID to file (default:%s)" %
234
                           os.path.join(os.getcwd(), "dispatcher.pid"))
235
    parser.add_option("--purge-queues", action="store_true",
236
                      default=False, dest="purge_queues",
237
                      help="Remove all declared queues (DANGEROUS!)")
238
    parser.add_option("--purge-exchanges", action="store_true",
239
                      default=False, dest="purge_exchanges",
240
                      help="Remove all exchanges. Implies deleting all queues \
241
                           first (DANGEROUS!)")
242
    parser.add_option("--drain-queue", dest="drain_queue",
243
                      help="Strips a queue from all outstanding messages")
244

    
245
    return parser.parse_args(args)
246

    
247

    
248
def purge_queues() :
249
    """
250
        Delete declared queues from RabbitMQ. Use with care!
251
    """
252
    global QUEUES, BINDINGS
253
    conn = get_connection()
254
    chan = conn.channel()
255

    
256
    print "Queues to be deleted: ", QUEUES
257

    
258
    if not get_user_confirmation():
259
        return
260

    
261
    for queue in QUEUES:
262
        try:
263
            chan.queue_delete(queue=queue)
264
            print "Deleting queue %s" % queue
265
        except amqp.exceptions.AMQPChannelException as e:
266
            print e.amqp_reply_code, " ", e.amqp_reply_text
267
            chan = conn.channel()
268

    
269
    chan.connection.close()
270

    
271

    
272
def purge_exchanges():
273
    """Delete declared exchanges from RabbitMQ, after removing all queues"""
274
    global QUEUES, BINDINGS
275
    purge_queues()
276

    
277
    conn = get_connection()
278
    chan = conn.channel()
279

    
280
    print "Exchanges to be deleted: ", settings.EXCHANGES
281

    
282
    if not get_user_confirmation():
283
        return
284

    
285
    for exchange in settings.EXCHANGES:
286
        try:
287
            chan.exchange_delete(exchange=exchange)
288
        except amqp.exceptions.AMQPChannelException as e:
289
            print e.amqp_reply_code, " ", e.amqp_reply_text
290

    
291
    chan.connection.close()
292

    
293

    
294
def drain_queue(queue):
295
    """Strip a (declared) queue from all outstanding messages"""
296
    global QUEUES, BINDINGS
297
    if not queue:
298
        return
299

    
300
    if not queue in QUEUES:
301
        print "Queue %s not configured" % queue
302
        return
303

    
304
    print "Queue to be drained: %s" % queue
305

    
306
    if not get_user_confirmation():
307
        return
308
    conn = get_connection()
309
    chan = conn.channel()
310

    
311
    # Register a temporary queue binding
312
    for binding in BINDINGS:
313
        if binding[0] == queue:
314
            exch = binding[1]
315

    
316
    if not exch:
317
        print "Queue not bound to any exchange: %s" % queue
318
        return
319

    
320
    chan.queue_bind(queue=queue, exchange=exch,routing_key='#')
321
    tag = chan.basic_consume(queue=queue, callback=callbacks.dummy_proc)
322

    
323
    print "Queue draining about to start, hit Ctrl+c when done"
324
    time.sleep(2)
325
    print "Queue draining starting"
326

    
327
    signal(SIGTERM, _exit_handler)
328
    signal(SIGINT, _exit_handler)
329

    
330
    num_processed = 0
331
    while True:
332
        chan.wait()
333
        num_processed += 1
334
        sys.stderr.write("Ignored %d messages\r" % num_processed)
335

    
336
    chan.basic_cancel(tag)
337
    chan.connection.close()
338

    
339

    
340
def get_connection():
341
    conn = amqp.Connection(host=settings.RABBIT_HOST,
342
                           userid=settings.RABBIT_USERNAME,
343
                           password=settings.RABBIT_PASSWORD,
344
                           virtual_host=settings.RABBIT_VHOST)
345
    return conn
346

    
347

    
348
def get_user_confirmation():
349
    ans = raw_input("Are you sure (N/y):")
350

    
351
    if not ans:
352
        return False
353
    if ans not in ['Y', 'y']:
354
        return False
355
    return True
356

    
357

    
358
def debug_mode():
359
    disp = Dispatcher(debug = True)
360
    signal(SIGINT, _exit_handler)
361
    signal(SIGTERM, _exit_handler)
362

    
363
    disp.wait()
364

    
365

    
366
def main():
367
    global children, logger
368
    (opts, args) = parse_arguments(sys.argv[1:])
369

    
370
    logger = log.get_logger("synnefo.dispatcher")
371

    
372
    # Init the global variables containing the queues
373
    _init_queues()
374

    
375
    # Special case for the clean up queues action
376
    if opts.purge_queues:
377
        purge_queues()
378
        return
379

    
380
    # Special case for the clean up exch action
381
    if opts.purge_exchanges:
382
        purge_exchanges()
383
        return
384

    
385
    if opts.drain_queue:
386
        drain_queue(opts.drain_queue)
387
        return
388

    
389
    # Debug mode, process messages without spawning workers
390
    if opts.debug:
391
        log.console_output(logger)
392
        debug_mode()
393
        return
394

    
395
    # Become a daemon
396
    daemon_context = daemon.DaemonContext(
397
        stdout=sys.stdout,
398
        stderr=sys.stderr,
399
        umask=022)
400

    
401
    daemon_context.open()
402

    
403
    # Create pidfile. Take care of differences between python-daemon versions.
404
    try:
405
        pidf = pidfile.TimeoutPIDLockFile(opts.pid_file, 10)
406
    except:
407
        pidf = pidlockfile.TimeoutPIDLockFile(opts.pid_file, 10)
408

    
409
    pidf.acquire()
410

    
411
    logger.info("Became a daemon")
412

    
413
    # Fork workers
414
    children = []
415

    
416
    i = 0
417
    while i < opts.workers:
418
        newpid = os.fork()
419

    
420
        if newpid == 0:
421
            signal(SIGINT,  _exit_handler)
422
            signal(SIGTERM, _exit_handler)
423
            child(sys.argv[1:])
424
            sys.exit(1)
425
        else:
426
            pids = (os.getpid(), newpid)
427
            logger.debug("%d, forked child: %d" % pids)
428
            children.append(pids[1])
429
        i += 1
430

    
431
    # Catch signals to ensure graceful shutdown
432
    signal(SIGINT,  _parent_handler)
433
    signal(SIGTERM, _parent_handler)
434

    
435
    # Wait for all children processes to die, one by one
436
    try :
437
        for pid in children:
438
            try:
439
                os.waitpid(pid, 0)
440
            except Exception:
441
                pass
442
    finally:
443
        pidf.release()
444

    
445
if __name__ == "__main__":
446
    sys.exit(main())
447

    
448
# vim: set sta sts=4 shiftwidth=4 sw=4 et ai :