Statistics
| Branch: | Tag: | Revision:

root / logic / dispatcher.py @ 698d0666

History | View | Annotate | Download (13.5 kB)

1
#!/usr/bin/env python
2
# Copyright 2011 GRNET S.A. All rights reserved.
3
#
4
# Redistribution and use in source and binary forms, with or without
5
# modification, are permitted provided that the following conditions
6
# are met:
7
#
8
#   1. Redistributions of source code must retain the above copyright
9
#      notice, this list of conditions and the following disclaimer.
10
#
11
#  2. Redistributions in binary form must reproduce the above copyright
12
#     notice, this list of conditions and the following disclaimer in the
13
#     documentation and/or other materials provided with the distribution.
14
#
15
# THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
16
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18
# ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
19
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
20
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
21
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
22
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
23
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
24
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
25
# SUCH DAMAGE.
26
#
27
# The views and conclusions contained in the software and documentation are
28
# those of the authors and should not be interpreted as representing official
29
# policies, either expressed or implied, of GRNET S.A.
30

    
31

    
32
""" Message queue setup, dispatch and admin
33

34
This program sets up connections to the queues configured in settings.py
35
and implements the message wait and dispatch loops. Actual messages are
36
handled in the dispatched functions.
37

38
"""
39
from django.core.management import setup_environ
40

    
41
import sys
42
import os
43
path = os.path.normpath(os.path.join(os.getcwd(), '..'))
44
sys.path.append(path)
45
import synnefo.settings as settings
46
from synnefo.logic import log
47

    
48
setup_environ(settings)
49

    
50
from amqplib import client_0_8 as amqp
51
from signal import signal, SIGINT, SIGTERM
52

    
53
import time
54
import socket
55
from daemon import daemon
56

    
57
# Take care of differences between python-daemon versions.
58
try:
59
    from daemon import pidfile
60
except:
61
    from daemon import pidlockfile
62

    
63
from synnefo.logic import callbacks
64

    
65
# Queue names
66
QUEUES = []
67

    
68
# Queue bindings to exchanges
69
BINDINGS = []
70

    
71
class Dispatcher:
72

    
73
    logger = None
74
    chan = None
75
    debug = False
76
    clienttags = []
77

    
78
    def __init__(self, debug = False):
79
        
80
        # Initialize logger
81
        self.logger = log.get_logger('synnefo.dispatcher')
82

    
83
        self.debug = debug
84
        self._init()
85

    
86
    def wait(self):
87
        while True:
88
            try:
89
                self.chan.wait()
90
            except SystemExit:
91
                break
92
            except amqp.exceptions.AMQPConnectionException:
93
                self.logger.error("Server went away, reconnecting...")
94
                self._init()
95
            except socket.error:
96
                self.logger.error("Server went away, reconnecting...")
97
                self._init()
98

    
99
        [self.chan.basic_cancel(clienttag) for clienttag in self.clienttags]
100
        self.chan.connection.close()
101
        self.chan.close()
102

    
103
    def _init(self):
104
        global QUEUES, BINDINGS
105
        self.logger.info("Initializing")
106

    
107
        # Connect to RabbitMQ
108
        conn = None
109
        while conn == None:
110
            self.logger.info("Attempting to connect to %s",
111
                             settings.RABBIT_HOST)
112
            try:
113
                conn = amqp.Connection(host=settings.RABBIT_HOST,
114
                                       userid=settings.RABBIT_USERNAME,
115
                                       password=settings.RABBIT_PASSWORD,
116
                                       virtual_host=settings.RABBIT_VHOST)
117
            except socket.error:
118
                time.sleep(1)
119

    
120
        self.logger.info("Connection succesful, opening channel")
121
        self.chan = conn.channel()
122

    
123
        # Declare queues and exchanges
124
        for exchange in settings.EXCHANGES:
125
            self.chan.exchange_declare(exchange=exchange, type="topic",
126
                                       durable=True, auto_delete=False)
127

    
128
        for queue in QUEUES:
129
            self.chan.queue_declare(queue=queue, durable=True,
130
                                    exclusive=False, auto_delete=False)
131

    
132
        bindings = BINDINGS
133

    
134
        # Bind queues to handler methods
135
        for binding in bindings:
136
            try:
137
                callback = getattr(callbacks, binding[3])
138
            except AttributeError:
139
                self.logger.error("Cannot find callback %s" % binding[3])
140
                continue
141

    
142
            self.chan.queue_bind(queue=binding[0], exchange=binding[1],
143
                                 routing_key=binding[2])
144
            tag = self.chan.basic_consume(queue=binding[0], callback=callback)
145
            self.logger.debug("Binding %s(%s) to queue %s with handler %s" %
146
                              (binding[1], binding[2], binding[0], binding[3]))
147
            self.clienttags.append(tag)
148

    
149

    
150
def _init_queues():
151
    global QUEUES, BINDINGS
152

    
153
    # Queue declarations
154
    prefix = settings.BACKEND_PREFIX_ID.split('-')[0]
155

    
156
    QUEUE_GANETI_EVENTS_OP = "%s-events-op" % prefix
157
    QUEUE_GANETI_EVENTS_NET = "%s-events-net" % prefix
158
    QUEUE_CRON_CREDITS = "%s-credits" % prefix
159
    QUEUE_EMAIL = "%s-email" % prefix
160
    QUEUE_RECONC = "%s-reconciliation" % prefix
161
    if settings.DEBUG is True:
162
        QUEUE_DEBUG = "debug"       # Debug queue, retrieves all messages
163

    
164
    QUEUES = (QUEUE_GANETI_EVENTS_OP, QUEUE_GANETI_EVENTS_NET,
165
              QUEUE_CRON_CREDITS, QUEUE_EMAIL, QUEUE_RECONC)
166

    
167
    # notifications of type "ganeti-op-status"
168
    DB_HANDLER_KEY_OP ='ganeti.%s.event.op' % prefix
169
    # notifications of type "ganeti-net-status"
170
    DB_HANDLER_KEY_NET ='ganeti.%s.event.net' % prefix
171

    
172
    BINDINGS = [
173
    # Queue                   # Exchange                # RouteKey          # Handler
174
    (QUEUE_GANETI_EVENTS_OP,  settings.EXCHANGE_GANETI, DB_HANDLER_KEY_OP,  'update_db'),
175
    (QUEUE_GANETI_EVENTS_NET, settings.EXCHANGE_GANETI, DB_HANDLER_KEY_NET, 'update_net'),
176
    (QUEUE_CRON_CREDITS,      settings.EXCHANGE_CRON,   '*.credits.*',      'update_credits'),
177
    (QUEUE_EMAIL,             settings.EXCHANGE_API,    '*.email.*',        'send_email'),
178
    (QUEUE_EMAIL,             settings.EXCHANGE_CRON,   '*.email.*',        'send_email'),
179
    (QUEUE_RECONC,            settings.EXCHANGE_CRON,   'reconciliation.*', 'trigger_status_update'),
180
    ]
181

    
182
    if settings.DEBUG is True:
183
        BINDINGS += [
184
            # Queue       # Exchange          # RouteKey  # Handler
185
            (QUEUE_DEBUG, settings.EXCHANGE_GANETI, '#',  'dummy_proc'),
186
            (QUEUE_DEBUG, settings.EXCHANGE_CRON,   '#',  'dummy_proc'),
187
            (QUEUE_DEBUG, settings.EXCHANGE_API,    '#',  'dummy_proc'),
188
        ]
189
        QUEUES += (QUEUE_DEBUG,)
190

    
191

    
192
def _exit_handler(signum, frame):
193
    """"Catch exit signal in children processes."""
194
    print "%d: Caught signal %d, will raise SystemExit" % (os.getpid(), signum)
195
    raise SystemExit
196

    
197

    
198
def _parent_handler(signum, frame):
199
    """"Catch exit signal in parent process and forward it to children."""
200
    global children
201
    print "Caught signal %d, sending kill signal to children" % signum
202
    [os.kill(pid, SIGTERM) for pid in children]
203

    
204

    
205
def child(cmdline):
206
    """The context of the child process"""
207

    
208
    # Cmd line argument parsing
209
    (opts, args) = parse_arguments(cmdline)
210
    disp = Dispatcher(debug = opts.debug)
211

    
212
    # Start the event loop
213
    disp.wait()
214

    
215

    
216
def parse_arguments(args):
217
    from optparse import OptionParser
218

    
219
    parser = OptionParser()
220
    parser.add_option("-d", "--debug", action="store_true", default=False,
221
                      dest="debug", help="Enable debug mode")
222
    parser.add_option("-w", "--workers", default=2, dest="workers",
223
                      help="Number of workers to spawn", type="int")
224
    parser.add_option("-p", '--pid-file', dest="pid_file",
225
                      default=os.path.join(os.getcwd(), "dispatcher.pid"),
226
                      help="Save PID to file (default:%s)" %
227
                           os.path.join(os.getcwd(), "dispatcher.pid"))
228
    parser.add_option("--purge-queues", action="store_true",
229
                      default=False, dest="purge_queues",
230
                      help="Remove all declared queues (DANGEROUS!)")
231
    parser.add_option("--purge-exchanges", action="store_true",
232
                      default=False, dest="purge_exchanges",
233
                      help="Remove all exchanges. Implies deleting all queues \
234
                           first (DANGEROUS!)")
235
    parser.add_option("--drain-queue", dest="drain_queue",
236
                      help="Strips a queue from all outstanding messages")
237

    
238
    return parser.parse_args(args)
239

    
240

    
241
def purge_queues() :
242
    """
243
        Delete declared queues from RabbitMQ. Use with care!
244
    """
245
    global QUEUES, BINDINGS
246
    conn = get_connection()
247
    chan = conn.channel()
248

    
249
    print "Queues to be deleted: ", QUEUES
250

    
251
    if not get_user_confirmation():
252
        return
253

    
254
    for queue in QUEUES:
255
        try:
256
            chan.queue_delete(queue=queue)
257
            print "Deleting queue %s" % queue
258
        except amqp.exceptions.AMQPChannelException as e:
259
            print e.amqp_reply_code, " ", e.amqp_reply_text
260
            chan = conn.channel()
261

    
262
    chan.connection.close()
263

    
264

    
265
def purge_exchanges():
266
    """
267
        Delete declared exchanges from RabbitMQ, after removing all queues first
268
    """
269
    global QUEUES, BINDINGS
270
    purge_queues()
271

    
272
    conn = get_connection()
273
    chan = conn.channel()
274

    
275
    print "Exchnages to be deleted: ", settings.EXCHANGES
276

    
277
    if not get_user_confirmation():
278
        return
279

    
280
    for exchange in settings.EXCHANGES:
281
        try:
282
            chan.exchange_delete(exchange=exchange)
283
        except amqp.exceptions.AMQPChannelException as e:
284
            print e.amqp_reply_code, " ", e.amqp_reply_text
285

    
286
    chan.connection.close()
287

    
288

    
289
def drain_queue(queue):
290
    """
291
        Strip a (declared) queue from all outstanding messages
292
    """
293
    global QUEUES, BINDINGS
294
    if not queue:
295
        return
296

    
297
    if not queue in QUEUES:
298
        print "Queue %s not configured" % queue
299
        return
300

    
301
    print "Queue to be drained: %s" % queue
302

    
303
    if not get_user_confirmation():
304
        return
305
    conn = get_connection()
306
    chan = conn.channel()
307

    
308
    # Register a temporary queue binding
309
    for binding in BINDINGS:
310
        if binding[0] == queue:
311
            exch = binding[1]
312

    
313
    if not exch:
314
        print "Queue not bound to any exchange: %s" % queue
315
        return
316

    
317
    chan.queue_bind(queue=queue, exchange=exch,routing_key='#')
318
    tag = chan.basic_consume(queue=queue, callback=callbacks.dummy_proc)
319

    
320
    print "Queue draining about to start, hit Ctrl+c when done"
321
    time.sleep(2)
322
    print "Queue draining starting"
323

    
324
    signal(SIGTERM, _exit_handler)
325
    signal(SIGINT, _exit_handler)
326

    
327
    num_processed = 0
328
    while True:
329
        chan.wait()
330
        num_processed += 1
331
        sys.stderr.write("Ignored %d messages\r" % num_processed)
332

    
333
    chan.basic_cancel(tag)
334
    chan.connection.close()
335

    
336

    
337
def get_connection():
338
    conn = amqp.Connection( host=settings.RABBIT_HOST,
339
                        userid=settings.RABBIT_USERNAME,
340
                        password=settings.RABBIT_PASSWORD,
341
                        virtual_host=settings.RABBIT_VHOST)
342
    return conn
343

    
344

    
345
def get_user_confirmation():
346
    ans = raw_input("Are you sure (N/y):")
347

    
348
    if not ans:
349
        return False
350
    if ans not in ['Y', 'y']:
351
        return False
352
    return True
353

    
354

    
355
def debug_mode():
356
    disp = Dispatcher(debug = True)
357
    signal(SIGINT, _exit_handler)
358
    signal(SIGTERM, _exit_handler)
359

    
360
    disp.wait()
361

    
362

    
363
def main():
364
    global children, logger
365
    (opts, args) = parse_arguments(sys.argv[1:])
366

    
367
    logger = log.get_logger("synnefo.dispatcher")
368

    
369
    # Init the global variables containing the queues
370
    _init_queues()
371

    
372
    # Special case for the clean up queues action
373
    if opts.purge_queues:
374
        purge_queues()
375
        return
376

    
377
    # Special case for the clean up exch action
378
    if opts.purge_exchanges:
379
        purge_exchanges()
380
        return
381

    
382
    if opts.drain_queue:
383
        drain_queue(opts.drain_queue)
384
        return
385

    
386
    # Debug mode, process messages without spawning workers
387
    if opts.debug:
388
        debug_mode()
389
        return
390

    
391
    # Become a daemon
392
    daemon_context = daemon.DaemonContext(
393
        stdout=sys.stdout,
394
        stderr=sys.stderr,
395
        umask=022)
396

    
397
    daemon_context.open()
398

    
399
    # Create pidfile. Take care of differences between python-daemon versions.
400
    try:
401
        pidf = pidfile.TimeoutPIDLockFile(opts.pid_file, 10)
402
    except:
403
        pidf = pidlockfile.TimeoutPIDLockFile(opts.pid_file, 10)
404

    
405
    pidf.acquire()
406

    
407
    logger.info("Became a daemon")
408

    
409
    # Fork workers
410
    children = []
411

    
412
    i = 0
413
    while i < opts.workers:
414
        newpid = os.fork()
415

    
416
        if newpid == 0:
417
            signal(SIGINT,  _exit_handler)
418
            signal(SIGTERM, _exit_handler)
419
            child(sys.argv[1:])
420
            sys.exit(1)
421
        else:
422
            pids = (os.getpid(), newpid)
423
            logger.debug("%d, forked child: %d" % pids)
424
            children.append(pids[1])
425
        i += 1
426

    
427
    # Catch signals to ensure graceful shutdown
428
    signal(SIGINT,  _parent_handler)
429
    signal(SIGTERM, _parent_handler)
430

    
431
    # Wait for all children processes to die, one by one
432
    try :
433
        for pid in children:
434
            try:
435
                os.waitpid(pid, 0)
436
            except Exception:
437
                pass
438
    finally:
439
        pidf.release()
440

    
441
if __name__ == "__main__":
442
    sys.exit(main())
443

    
444
# vim: set sta sts=4 shiftwidth=4 sw=4 et ai :