Statistics
| Branch: | Tag: | Revision:

root / logic / dispatcher.py @ 226f086a

History | View | Annotate | Download (13.7 kB)

1
#!/usr/bin/env python
2
# Copyright 2011 GRNET S.A. All rights reserved.
3
#
4
# Redistribution and use in source and binary forms, with or without
5
# modification, are permitted provided that the following conditions
6
# are met:
7
#
8
#   1. Redistributions of source code must retain the above copyright
9
#      notice, this list of conditions and the following disclaimer.
10
#
11
#  2. Redistributions in binary form must reproduce the above copyright
12
#     notice, this list of conditions and the following disclaimer in the
13
#     documentation and/or other materials provided with the distribution.
14
#
15
# THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
16
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18
# ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
19
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
20
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
21
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
22
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
23
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
24
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
25
# SUCH DAMAGE.
26
#
27
# The views and conclusions contained in the software and documentation are
28
# those of the authors and should not be interpreted as representing official
29
# policies, either expressed or implied, of GRNET S.A.
30

    
31

    
32
""" Message queue setup, dispatch and admin
33

34
This program sets up connections to the queues configured in settings.py
35
and implements the message wait and dispatch loops. Actual messages are
36
handled in the dispatched functions.
37

38
"""
39
from django.core.management import setup_environ
40

    
41
import sys
42
import os
43
path = os.path.normpath(os.path.join(os.getcwd(), '..'))
44
sys.path.append(path)
45
import synnefo.settings as settings
46
from synnefo.logic import log
47

    
48
setup_environ(settings)
49

    
50
from amqplib import client_0_8 as amqp
51
from signal import signal, SIGINT, SIGTERM
52

    
53
import time
54
import socket
55
from daemon import daemon
56

    
57
# Take care of differences between python-daemon versions.
58
try:
59
    from daemon import pidfile
60
except:
61
    from daemon import pidlockfile
62

    
63
from synnefo.logic import callbacks
64

    
65
class Dispatcher:
66

    
67
    logger = None
68
    chan = None
69
    debug = False
70
    clienttags = []
71

    
72
    def __init__(self, debug = False):
73
        
74
        # Initialize logger
75
        self.logger = log.get_logger('synnefo.dispatcher')
76

    
77
        self.debug = debug
78
        self._init()
79

    
80
    def wait(self):
81
        while True:
82
            try:
83
                self.chan.wait()
84
            except SystemExit:
85
                break
86
            except amqp.exceptions.AMQPConnectionException:
87
                self.logger.error("Server went away, reconnecting...")
88
                self._init()
89
            except socket.error:
90
                self.logger.error("Server went away, reconnecting...")
91
                self._init()
92

    
93
        [self.chan.basic_cancel(clienttag) for clienttag in self.clienttags]
94
        self.chan.connection.close()
95
        self.chan.close()
96

    
97
    def _init(self):
98
        self.logger.info("Initializing")
99

    
100
        # Queue declarations
101
        prefix = settings.BACKEND_PREFIX_ID.split('-')[0]
102

    
103
        QUEUE_GANETI_EVENTS_OP = "%s-events-op" % prefix
104
        QUEUE_GANETI_EVENTS_NET = "%s-events-net" % prefix
105
        QUEUE_CRON_CREDITS = "%s-credits" % prefix
106
        QUEUE_EMAIL = "%s-email" % prefix
107
        QUEUE_RECONC = "%s-reconciliation" % prefix
108
        if settings.DEBUG is True:
109
            QUEUE_DEBUG = "debug"       # Debug queue, retrieves all messages
110

    
111
        QUEUES = (QUEUE_GANETI_EVENTS_OP, QUEUE_GANETI_EVENTS_NET,
112
                  QUEUE_CRON_CREDITS, QUEUE_EMAIL, QUEUE_RECONC)
113

    
114
        if settings.DEBUG is True:
115
            BINDINGS_DEBUG = [
116
                # Queue       # Exchange          # RouteKey  # Handler
117
                (QUEUE_DEBUG, settings.EXCHANGE_GANETI, '#',  'dummy_proc'),
118
                (QUEUE_DEBUG, settings.EXCHANGE_CRON,   '#',  'dummy_proc'),
119
                (QUEUE_DEBUG, settings.EXCHANGE_API,    '#',  'dummy_proc'),
120
            ]
121
            QUEUES += (QUEUE_DEBUG,)
122

    
123
        # notifications of type "ganeti-op-status"
124
        DB_HANDLER_KEY_OP ='ganeti.%s.event.op' % prefix
125
        # notifications of type "ganeti-net-status"
126
        DB_HANDLER_KEY_NET ='ganeti.%s.event.net' % prefix
127

    
128
        BINDINGS = [
129
        # Queue                   # Exchange                # RouteKey          # Handler
130
        (QUEUE_GANETI_EVENTS_OP,  settings.EXCHANGE_GANETI, DB_HANDLER_KEY_OP,  'update_db'),
131
        (QUEUE_GANETI_EVENTS_NET, settings.EXCHANGE_GANETI, DB_HANDLER_KEY_NET, 'update_net'),
132
        (QUEUE_CRON_CREDITS,      settings.EXCHANGE_CRON,   '*.credits.*',      'update_credits'),
133
        (QUEUE_EMAIL,             settings.EXCHANGE_API,    '*.email.*',        'send_email'),
134
        (QUEUE_EMAIL,             settings.EXCHANGE_CRON,   '*.email.*',        'send_email'),
135
        (QUEUE_RECONC,            settings.EXCHANGE_CRON,   'reconciliation.*', 'trigger_status_update'),
136
        ]
137

    
138
        # Connect to RabbitMQ
139
        conn = None
140
        while conn == None:
141
            self.logger.info("Attempting to connect to %s",
142
                             settings.RABBIT_HOST)
143
            try:
144
                conn = amqp.Connection(host=settings.RABBIT_HOST,
145
                                       userid=settings.RABBIT_USERNAME,
146
                                       password=settings.RABBIT_PASSWORD,
147
                                       virtual_host=settings.RABBIT_VHOST)
148
            except socket.error:
149
                time.sleep(1)
150

    
151
        self.logger.info("Connection succesful, opening channel")
152
        self.chan = conn.channel()
153

    
154
        # Declare queues and exchanges
155
        for exchange in settings.EXCHANGES:
156
            self.chan.exchange_declare(exchange=exchange, type="topic",
157
                                       durable=True, auto_delete=False)
158

    
159
        for queue in QUEUES:
160
            self.chan.queue_declare(queue=queue, durable=True,
161
                                    exclusive=False, auto_delete=False)
162

    
163
        bindings = BINDINGS
164

    
165
        # Special queue for debugging, should not appear in production
166
        if self.debug and settings.DEBUG:
167
            self.chan.queue_declare(queue=QUEUE_DEBUG, durable=True,
168
                                    exclusive=False, auto_delete=False)
169
            bindings += BINDINGS_DEBUG
170

    
171
        # Bind queues to handler methods
172
        for binding in bindings:
173
            try:
174
                callback = getattr(callbacks, binding[3])
175
            except AttributeError:
176
                self.logger.error("Cannot find callback %s" % binding[3])
177
                continue
178

    
179
            self.chan.queue_bind(queue=binding[0], exchange=binding[1],
180
                                 routing_key=binding[2])
181
            tag = self.chan.basic_consume(queue=binding[0], callback=callback)
182
            self.logger.debug("Binding %s(%s) to queue %s with handler %s" %
183
                              (binding[1], binding[2], binding[0], binding[3]))
184
            self.clienttags.append(tag)
185

    
186

    
187
def _exit_handler(signum, frame):
188
    """"Catch exit signal in children processes."""
189
    print "%d: Caught signal %d, will raise SystemExit" % (os.getpid(), signum)
190
    raise SystemExit
191

    
192

    
193
def _parent_handler(signum, frame):
194
    """"Catch exit signal in parent process and forward it to children."""
195
    global children
196
    print "Caught signal %d, sending kill signal to children" % signum
197
    [os.kill(pid, SIGTERM) for pid in children]
198

    
199

    
200
def child(cmdline):
201
    """The context of the child process"""
202

    
203
    # Cmd line argument parsing
204
    (opts, args) = parse_arguments(cmdline)
205
    disp = Dispatcher(debug = opts.debug)
206

    
207
    # Start the event loop
208
    disp.wait()
209

    
210

    
211
def parse_arguments(args):
212
    from optparse import OptionParser
213

    
214
    parser = OptionParser()
215
    parser.add_option("-d", "--debug", action="store_true", default=False,
216
                      dest="debug", help="Enable debug mode")
217
    parser.add_option("-w", "--workers", default=2, dest="workers",
218
                      help="Number of workers to spawn", type="int")
219
    parser.add_option("-p", '--pid-file', dest="pid_file",
220
                      default=os.path.join(os.getcwd(), "dispatcher.pid"),
221
                      help="Save PID to file (default:%s)" %
222
                           os.path.join(os.getcwd(), "dispatcher.pid"))
223
    parser.add_option("--purge-queues", action="store_true",
224
                      default=False, dest="purge_queues",
225
                      help="Remove all declared queues (DANGEROUS!)")
226
    parser.add_option("--purge-exchanges", action="store_true",
227
                      default=False, dest="purge_exchanges",
228
                      help="Remove all exchanges. Implies deleting all queues \
229
                           first (DANGEROUS!)")
230
    parser.add_option("--drain-queue", dest="drain_queue",
231
                      help="Strips a queue from all outstanding messages")
232

    
233
    return parser.parse_args(args)
234

    
235

    
236
def purge_queues() :
237
    """
238
        Delete declared queues from RabbitMQ. Use with care!
239
    """
240
    conn = get_connection()
241
    chan = conn.channel()
242

    
243
    print "Queues to be deleted: ",  settings.QUEUES
244

    
245
    if not get_user_confirmation():
246
        return
247

    
248
    for queue in settings.QUEUES:
249
        try:
250
            chan.queue_delete(queue=queue)
251
            print "Deleting queue %s" % queue
252
        except amqp.exceptions.AMQPChannelException as e:
253
            print e.amqp_reply_code, " ", e.amqp_reply_text
254
            chan = conn.channel()
255

    
256
    chan.connection.close()
257

    
258

    
259
def purge_exchanges():
260
    """
261
        Delete declared exchanges from RabbitMQ, after removing all queues first
262
    """
263
    purge_queues()
264

    
265
    conn = get_connection()
266
    chan = conn.channel()
267

    
268
    print "Exchnages to be deleted: ", settings.EXCHANGES
269

    
270
    if not get_user_confirmation():
271
        return
272

    
273
    for exchange in settings.EXCHANGES:
274
        try:
275
            chan.exchange_delete(exchange=exchange)
276
        except amqp.exceptions.AMQPChannelException as e:
277
            print e.amqp_reply_code, " ", e.amqp_reply_text
278

    
279
    chan.connection.close()
280

    
281

    
282
def drain_queue(queue):
283
    """
284
        Strip a (declared) queue from all outstanding messages
285
    """
286
    if not queue:
287
        return
288

    
289
    if not queue in settings.QUEUES:
290
        print "Queue %s not configured" % queue
291
        return
292

    
293
    print "Queue to be drained: %s" % queue
294

    
295
    if not get_user_confirmation():
296
        return
297
    conn = get_connection()
298
    chan = conn.channel()
299

    
300
    # Register a temporary queue binding
301
    for binding in settings.BINDINGS:
302
        if binding[0] == queue:
303
            exch = binding[1]
304

    
305
    if not exch:
306
        print "Queue not bound to any exchange: %s" % queue
307
        return
308

    
309
    chan.queue_bind(queue=queue, exchange=exch,routing_key='#')
310
    tag = chan.basic_consume(queue=queue, callback=callbacks.dummy_proc)
311

    
312
    print "Queue draining about to start, hit Ctrl+c when done"
313
    time.sleep(2)
314
    print "Queue draining starting"
315

    
316
    signal(SIGTERM, _exit_handler)
317
    signal(SIGINT, _exit_handler)
318

    
319
    num_processed = 0
320
    while True:
321
        chan.wait()
322
        num_processed += 1
323
        sys.stderr.write("Ignored %d messages\r" % num_processed)
324

    
325
    chan.basic_cancel(tag)
326
    chan.connection.close()
327

    
328

    
329
def get_connection():
330
    conn = amqp.Connection( host=settings.RABBIT_HOST,
331
                        userid=settings.RABBIT_USERNAME,
332
                        password=settings.RABBIT_PASSWORD,
333
                        virtual_host=settings.RABBIT_VHOST)
334
    return conn
335

    
336

    
337
def get_user_confirmation():
338
    ans = raw_input("Are you sure (N/y):")
339

    
340
    if not ans:
341
        return False
342
    if ans not in ['Y', 'y']:
343
        return False
344
    return True
345

    
346

    
347
def debug_mode():
348
    disp = Dispatcher(debug = True)
349
    signal(SIGINT, _exit_handler)
350
    signal(SIGTERM, _exit_handler)
351

    
352
    disp.wait()
353

    
354

    
355
def main():
356
    global children, logger
357
    (opts, args) = parse_arguments(sys.argv[1:])
358

    
359
    logger = log.get_logger("synnefo.dispatcher")
360

    
361
    # Special case for the clean up queues action
362
    if opts.purge_queues:
363
        purge_queues()
364
        return
365

    
366
    # Special case for the clean up exch action
367
    if opts.purge_exchanges:
368
        purge_exchanges()
369
        return
370

    
371
    if opts.drain_queue:
372
        drain_queue(opts.drain_queue)
373
        return
374

    
375
    # Debug mode, process messages without spawning workers
376
    if opts.debug:
377
        debug_mode()
378
        return
379

    
380
    # Become a daemon
381
    daemon_context = daemon.DaemonContext(
382
        stdout=sys.stdout,
383
        stderr=sys.stderr,
384
        umask=022)
385

    
386
    daemon_context.open()
387

    
388
    # Create pidfile. Take care of differences between python-daemon versions.
389
    try:
390
        pidf = pidfile.TimeoutPIDLockFile(opts.pid_file, 10)
391
    except:
392
        pidf = pidlockfile.TimeoutPIDLockFile(opts.pid_file, 10)
393

    
394
    pidf.acquire()
395

    
396
    logger.info("Became a daemon")
397

    
398
    # Fork workers
399
    children = []
400

    
401
    i = 0
402
    while i < opts.workers:
403
        newpid = os.fork()
404

    
405
        if newpid == 0:
406
            signal(SIGINT,  _exit_handler)
407
            signal(SIGTERM, _exit_handler)
408
            child(sys.argv[1:])
409
            sys.exit(1)
410
        else:
411
            pids = (os.getpid(), newpid)
412
            logger.debug("%d, forked child: %d" % pids)
413
            children.append(pids[1])
414
        i += 1
415

    
416
    # Catch signals to ensure graceful shutdown
417
    signal(SIGINT,  _parent_handler)
418
    signal(SIGTERM, _parent_handler)
419

    
420
    # Wait for all children processes to die, one by one
421
    try :
422
        for pid in children:
423
            try:
424
                os.waitpid(pid, 0)
425
            except Exception:
426
                pass
427
    finally:
428
        pidf.release()
429

    
430
if __name__ == "__main__":
431
    sys.exit(main())
432

    
433
# vim: set sta sts=4 shiftwidth=4 sw=4 et ai :