Statistics
| Branch: | Tag: | Revision:

root / logic / dispatcher.py @ 9068cd85

History | View | Annotate | Download (13.8 kB)

1
#!/usr/bin/env python
2
# Copyright 2011 GRNET S.A. All rights reserved.
3
#
4
# Redistribution and use in source and binary forms, with or without
5
# modification, are permitted provided that the following conditions
6
# are met:
7
#
8
#   1. Redistributions of source code must retain the above copyright
9
#      notice, this list of conditions and the following disclaimer.
10
#
11
#  2. Redistributions in binary form must reproduce the above copyright
12
#     notice, this list of conditions and the following disclaimer in the
13
#     documentation and/or other materials provided with the distribution.
14
#
15
# THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
16
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18
# ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
19
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
20
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
21
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
22
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
23
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
24
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
25
# SUCH DAMAGE.
26
#
27
# The views and conclusions contained in the software and documentation are
28
# those of the authors and should not be interpreted as representing official
29
# policies, either expressed or implied, of GRNET S.A.
30

    
31

    
32
""" Message queue setup, dispatch and admin
33

34
This program sets up connections to the queues configured in settings.py
35
and implements the message wait and dispatch loops. Actual messages are
36
handled in the dispatched functions.
37

38
"""
39
from django.core.management import setup_environ
40

    
41
import sys
42
import os
43
path = os.path.normpath(os.path.join(os.getcwd(), '..'))
44
sys.path.append(path)
45
import synnefo.settings as settings
46
from synnefo.logic import log
47

    
48
setup_environ(settings)
49

    
50
from amqplib import client_0_8 as amqp
51
from signal import signal, SIGINT, SIGTERM
52

    
53
import time
54
import socket
55
from daemon import daemon
56

    
57
# Take care of differences between python-daemon versions.
58
try:
59
    from daemon import pidfile
60
except:
61
    from daemon import pidlockfile
62

    
63
from synnefo.logic import callbacks
64

    
65
# Queue names
66
QUEUES = []
67

    
68
# Queue bindings to exchanges
69
BINDINGS = []
70

    
71
class Dispatcher:
72

    
73
    logger = None
74
    chan = None
75
    debug = False
76
    clienttags = []
77

    
78
    def __init__(self, debug = False):
79
        
80
        # Initialize logger
81
        self.logger = log.get_logger('synnefo.dispatcher')
82

    
83
        self.debug = debug
84
        self._init()
85

    
86
    def wait(self):
87
        while True:
88
            try:
89
                self.chan.wait()
90
            except SystemExit:
91
                break
92
            except amqp.exceptions.AMQPConnectionException:
93
                self.logger.error("Server went away, reconnecting...")
94
                self._init()
95
            except socket.error:
96
                self.logger.error("Server went away, reconnecting...")
97
                self._init()
98

    
99
        [self.chan.basic_cancel(clienttag) for clienttag in self.clienttags]
100
        self.chan.connection.close()
101
        self.chan.close()
102

    
103
    def _init(self):
104
        global QUEUES, BINDINGS
105
        self.logger.info("Initializing")
106

    
107
        # Connect to RabbitMQ
108
        conn = None
109
        while conn == None:
110
            self.logger.info("Attempting to connect to %s",
111
                             settings.RABBIT_HOST)
112
            try:
113
                conn = amqp.Connection(host=settings.RABBIT_HOST,
114
                                       userid=settings.RABBIT_USERNAME,
115
                                       password=settings.RABBIT_PASSWORD,
116
                                       virtual_host=settings.RABBIT_VHOST)
117
            except socket.error:
118
                time.sleep(1)
119

    
120
        self.logger.info("Connection succesful, opening channel")
121
        self.chan = conn.channel()
122

    
123
        # Declare queues and exchanges
124
        for exchange in settings.EXCHANGES:
125
            self.chan.exchange_declare(exchange=exchange, type="topic",
126
                                       durable=True, auto_delete=False)
127

    
128
        for queue in QUEUES:
129
            self.chan.queue_declare(queue=queue, durable=True,
130
                                    exclusive=False, auto_delete=False)
131

    
132
        bindings = BINDINGS
133

    
134
        # Bind queues to handler methods
135
        for binding in bindings:
136
            try:
137
                callback = getattr(callbacks, binding[3])
138
            except AttributeError:
139
                self.logger.error("Cannot find callback %s" % binding[3])
140
                continue
141

    
142
            self.chan.queue_bind(queue=binding[0], exchange=binding[1],
143
                                 routing_key=binding[2])
144
            tag = self.chan.basic_consume(queue=binding[0], callback=callback)
145
            self.logger.debug("Binding %s(%s) to queue %s with handler %s" %
146
                              (binding[1], binding[2], binding[0], binding[3]))
147
            self.clienttags.append(tag)
148

    
149

    
150
def _init_queues():
151
    global QUEUES, BINDINGS
152

    
153
    # Queue declarations
154
    prefix = settings.BACKEND_PREFIX_ID.split('-')[0]
155

    
156
    QUEUE_GANETI_EVENTS_OP = "%s-events-op" % prefix
157
    QUEUE_GANETI_EVENTS_NET = "%s-events-net" % prefix
158
    QUEUE_GANETI_BUILD_PROGR = "%s-events-progress" % prefix
159
    QUEUE_CRON_CREDITS = "%s-credits" % prefix
160
    QUEUE_EMAIL = "%s-email" % prefix
161
    QUEUE_RECONC = "%s-reconciliation" % prefix
162
    if settings.DEBUG is True:
163
        QUEUE_DEBUG = "debug"       # Debug queue, retrieves all messages
164

    
165
    QUEUES = (QUEUE_GANETI_EVENTS_OP, QUEUE_GANETI_EVENTS_NET,
166
              QUEUE_CRON_CREDITS, QUEUE_EMAIL, QUEUE_RECONC,
167
              QUEUE_GANETI_BUILD_PROGR)
168

    
169
    # notifications of type "ganeti-op-status"
170
    DB_HANDLER_KEY_OP ='ganeti.%s.event.op' % prefix
171
    # notifications of type "ganeti-net-status"
172
    DB_HANDLER_KEY_NET ='ganeti.%s.event.net' % prefix
173
    # Build process monitoring event
174
    BUILD_MONITOR_HANDLER = 'ganeti.%s.event.progress' %prefix
175

    
176
    BINDINGS = [
177
    # Queue                   # Exchange                # RouteKey              # Handler
178
    (QUEUE_GANETI_EVENTS_OP,  settings.EXCHANGE_GANETI, DB_HANDLER_KEY_OP,      'update_db'),
179
    (QUEUE_GANETI_EVENTS_NET, settings.EXCHANGE_GANETI, DB_HANDLER_KEY_NET,     'update_net'),
180
    (QUEUE_GANETI_BUILD_PROGR,settings.EXCHANGE_GANETI, BUILD_MONITOR_HANDLER,  'update_build_progress'),
181
    (QUEUE_CRON_CREDITS,      settings.EXCHANGE_CRON,   '*.credits.*',          'update_credits'),
182
    (QUEUE_EMAIL,             settings.EXCHANGE_API,    '*.email.*',            'send_email'),
183
    (QUEUE_EMAIL,             settings.EXCHANGE_CRON,   '*.email.*',            'send_email'),
184
    (QUEUE_RECONC,            settings.EXCHANGE_CRON,   'reconciliation.*',     'trigger_status_update'),
185
    ]
186

    
187
    if settings.DEBUG is True:
188
        BINDINGS += [
189
            # Queue       # Exchange          # RouteKey  # Handler
190
            (QUEUE_DEBUG, settings.EXCHANGE_GANETI, '#',  'dummy_proc'),
191
            (QUEUE_DEBUG, settings.EXCHANGE_CRON,   '#',  'dummy_proc'),
192
            (QUEUE_DEBUG, settings.EXCHANGE_API,    '#',  'dummy_proc'),
193
        ]
194
        QUEUES += (QUEUE_DEBUG,)
195

    
196

    
197
def _exit_handler(signum, frame):
198
    """"Catch exit signal in children processes."""
199
    print "%d: Caught signal %d, will raise SystemExit" % (os.getpid(), signum)
200
    raise SystemExit
201

    
202

    
203
def _parent_handler(signum, frame):
204
    """"Catch exit signal in parent process and forward it to children."""
205
    global children
206
    print "Caught signal %d, sending kill signal to children" % signum
207
    [os.kill(pid, SIGTERM) for pid in children]
208

    
209

    
210
def child(cmdline):
211
    """The context of the child process"""
212

    
213
    # Cmd line argument parsing
214
    (opts, args) = parse_arguments(cmdline)
215
    disp = Dispatcher(debug = opts.debug)
216

    
217
    # Start the event loop
218
    disp.wait()
219

    
220

    
221
def parse_arguments(args):
222
    from optparse import OptionParser
223

    
224
    parser = OptionParser()
225
    parser.add_option("-d", "--debug", action="store_true", default=False,
226
                      dest="debug", help="Enable debug mode")
227
    parser.add_option("-w", "--workers", default=2, dest="workers",
228
                      help="Number of workers to spawn", type="int")
229
    parser.add_option("-p", '--pid-file', dest="pid_file",
230
                      default=os.path.join(os.getcwd(), "dispatcher.pid"),
231
                      help="Save PID to file (default:%s)" %
232
                           os.path.join(os.getcwd(), "dispatcher.pid"))
233
    parser.add_option("--purge-queues", action="store_true",
234
                      default=False, dest="purge_queues",
235
                      help="Remove all declared queues (DANGEROUS!)")
236
    parser.add_option("--purge-exchanges", action="store_true",
237
                      default=False, dest="purge_exchanges",
238
                      help="Remove all exchanges. Implies deleting all queues \
239
                           first (DANGEROUS!)")
240
    parser.add_option("--drain-queue", dest="drain_queue",
241
                      help="Strips a queue from all outstanding messages")
242

    
243
    return parser.parse_args(args)
244

    
245

    
246
def purge_queues() :
247
    """
248
        Delete declared queues from RabbitMQ. Use with care!
249
    """
250
    global QUEUES, BINDINGS
251
    conn = get_connection()
252
    chan = conn.channel()
253

    
254
    print "Queues to be deleted: ", QUEUES
255

    
256
    if not get_user_confirmation():
257
        return
258

    
259
    for queue in QUEUES:
260
        try:
261
            chan.queue_delete(queue=queue)
262
            print "Deleting queue %s" % queue
263
        except amqp.exceptions.AMQPChannelException as e:
264
            print e.amqp_reply_code, " ", e.amqp_reply_text
265
            chan = conn.channel()
266

    
267
    chan.connection.close()
268

    
269

    
270
def purge_exchanges():
271
    """
272
        Delete declared exchanges from RabbitMQ, after removing all queues first
273
    """
274
    global QUEUES, BINDINGS
275
    purge_queues()
276

    
277
    conn = get_connection()
278
    chan = conn.channel()
279

    
280
    print "Exchnages to be deleted: ", settings.EXCHANGES
281

    
282
    if not get_user_confirmation():
283
        return
284

    
285
    for exchange in settings.EXCHANGES:
286
        try:
287
            chan.exchange_delete(exchange=exchange)
288
        except amqp.exceptions.AMQPChannelException as e:
289
            print e.amqp_reply_code, " ", e.amqp_reply_text
290

    
291
    chan.connection.close()
292

    
293

    
294
def drain_queue(queue):
295
    """
296
        Strip a (declared) queue from all outstanding messages
297
    """
298
    global QUEUES, BINDINGS
299
    if not queue:
300
        return
301

    
302
    if not queue in QUEUES:
303
        print "Queue %s not configured" % queue
304
        return
305

    
306
    print "Queue to be drained: %s" % queue
307

    
308
    if not get_user_confirmation():
309
        return
310
    conn = get_connection()
311
    chan = conn.channel()
312

    
313
    # Register a temporary queue binding
314
    for binding in BINDINGS:
315
        if binding[0] == queue:
316
            exch = binding[1]
317

    
318
    if not exch:
319
        print "Queue not bound to any exchange: %s" % queue
320
        return
321

    
322
    chan.queue_bind(queue=queue, exchange=exch,routing_key='#')
323
    tag = chan.basic_consume(queue=queue, callback=callbacks.dummy_proc)
324

    
325
    print "Queue draining about to start, hit Ctrl+c when done"
326
    time.sleep(2)
327
    print "Queue draining starting"
328

    
329
    signal(SIGTERM, _exit_handler)
330
    signal(SIGINT, _exit_handler)
331

    
332
    num_processed = 0
333
    while True:
334
        chan.wait()
335
        num_processed += 1
336
        sys.stderr.write("Ignored %d messages\r" % num_processed)
337

    
338
    chan.basic_cancel(tag)
339
    chan.connection.close()
340

    
341

    
342
def get_connection():
343
    conn = amqp.Connection( host=settings.RABBIT_HOST,
344
                        userid=settings.RABBIT_USERNAME,
345
                        password=settings.RABBIT_PASSWORD,
346
                        virtual_host=settings.RABBIT_VHOST)
347
    return conn
348

    
349

    
350
def get_user_confirmation():
351
    ans = raw_input("Are you sure (N/y):")
352

    
353
    if not ans:
354
        return False
355
    if ans not in ['Y', 'y']:
356
        return False
357
    return True
358

    
359

    
360
def debug_mode():
361
    disp = Dispatcher(debug = True)
362
    signal(SIGINT, _exit_handler)
363
    signal(SIGTERM, _exit_handler)
364

    
365
    disp.wait()
366

    
367

    
368
def main():
369
    global children, logger
370
    (opts, args) = parse_arguments(sys.argv[1:])
371

    
372
    logger = log.get_logger("synnefo.dispatcher")
373

    
374
    # Init the global variables containing the queues
375
    _init_queues()
376

    
377
    # Special case for the clean up queues action
378
    if opts.purge_queues:
379
        purge_queues()
380
        return
381

    
382
    # Special case for the clean up exch action
383
    if opts.purge_exchanges:
384
        purge_exchanges()
385
        return
386

    
387
    if opts.drain_queue:
388
        drain_queue(opts.drain_queue)
389
        return
390

    
391
    # Debug mode, process messages without spawning workers
392
    if opts.debug:
393
        debug_mode()
394
        return
395

    
396
    # Become a daemon
397
    daemon_context = daemon.DaemonContext(
398
        stdout=sys.stdout,
399
        stderr=sys.stderr,
400
        umask=022)
401

    
402
    daemon_context.open()
403

    
404
    # Create pidfile. Take care of differences between python-daemon versions.
405
    try:
406
        pidf = pidfile.TimeoutPIDLockFile(opts.pid_file, 10)
407
    except:
408
        pidf = pidlockfile.TimeoutPIDLockFile(opts.pid_file, 10)
409

    
410
    pidf.acquire()
411

    
412
    logger.info("Became a daemon")
413

    
414
    # Fork workers
415
    children = []
416

    
417
    i = 0
418
    while i < opts.workers:
419
        newpid = os.fork()
420

    
421
        if newpid == 0:
422
            signal(SIGINT,  _exit_handler)
423
            signal(SIGTERM, _exit_handler)
424
            child(sys.argv[1:])
425
            sys.exit(1)
426
        else:
427
            pids = (os.getpid(), newpid)
428
            logger.debug("%d, forked child: %d" % pids)
429
            children.append(pids[1])
430
        i += 1
431

    
432
    # Catch signals to ensure graceful shutdown
433
    signal(SIGINT,  _parent_handler)
434
    signal(SIGTERM, _parent_handler)
435

    
436
    # Wait for all children processes to die, one by one
437
    try :
438
        for pid in children:
439
            try:
440
                os.waitpid(pid, 0)
441
            except Exception:
442
                pass
443
    finally:
444
        pidf.release()
445

    
446
if __name__ == "__main__":
447
    sys.exit(main())
448

    
449
# vim: set sta sts=4 shiftwidth=4 sw=4 et ai :