Statistics
| Branch: | Tag: | Revision:

root / snf-app / synnefo / logic / dispatcher.py @ de470b1e

History | View | Annotate | Download (14.4 kB)

1
#!/usr/bin/env python
2
# Copyright 2011 GRNET S.A. All rights reserved.
3
#
4
# Redistribution and use in source and binary forms, with or without
5
# modification, are permitted provided that the following conditions
6
# are met:
7
#
8
#   1. Redistributions of source code must retain the above copyright
9
#      notice, this list of conditions and the following disclaimer.
10
#
11
#  2. Redistributions in binary form must reproduce the above copyright
12
#     notice, this list of conditions and the following disclaimer in the
13
#     documentation and/or other materials provided with the distribution.
14
#
15
# THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
16
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18
# ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
19
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
20
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
21
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
22
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
23
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
24
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
25
# SUCH DAMAGE.
26
#
27
# The views and conclusions contained in the software and documentation are
28
# those of the authors and should not be interpreted as representing official
29
# policies, either expressed or implied, of GRNET S.A.
30

    
31

    
32
""" Message queue setup, dispatch and admin
33

34
This program sets up connections to the queues configured in settings.py
35
and implements the message wait and dispatch loops. Actual messages are
36
handled in the dispatched functions.
37

38
"""
39
from django.core.management import setup_environ
40

    
41
import sys
42
import os
43
path = os.path.normpath(os.path.join(os.getcwd(), '..'))
44
sys.path.append(path)
45

    
46
from synnefo import settings
47
setup_environ(settings)
48

    
49
from amqplib import client_0_8 as amqp
50
from signal import signal, SIGINT, SIGTERM
51

    
52
import logging
53
import time
54
import socket
55
from daemon import daemon
56

    
57
# Take care of differences between python-daemon versions.
58
try:
59
    from daemon import pidfile
60
except:
61
    from daemon import pidlockfile
62

    
63
from synnefo.logic import callbacks
64
from synnefo.util.dictconfig import dictConfig
65

    
66

    
67
log = logging.getLogger()
68

    
69

    
70
# Queue names
71
QUEUES = []
72

    
73
# Queue bindings to exchanges
74
BINDINGS = []
75

    
76

    
77
class Dispatcher:
78
    chan = None
79
    debug = False
80
    clienttags = []
81

    
82
    def __init__(self, debug=False):
83
        self.debug = debug
84
        self._init()
85

    
86
    def wait(self):
87
        while True:
88
            try:
89
                self.chan.wait()
90
            except SystemExit:
91
                break
92
            except amqp.exceptions.AMQPConnectionException:
93
                log.error("Server went away, reconnecting...")
94
                self._init()
95
            except socket.error:
96
                log.error("Server went away, reconnecting...")
97
                self._init()
98
            except Exception, e:
99
                log.exception("Caught unexpected exception")
100

    
101
        [self.chan.basic_cancel(clienttag) for clienttag in self.clienttags]
102
        self.chan.connection.close()
103
        self.chan.close()
104

    
105
    def _init(self):
106
        global QUEUES, BINDINGS
107
        log.info("Initializing")
108

    
109
        # Connect to RabbitMQ
110
        conn = None
111
        while conn == None:
112
            log.info("Attempting to connect to %s", settings.RABBIT_HOST)
113
            try:
114
                conn = amqp.Connection(host=settings.RABBIT_HOST,
115
                                       userid=settings.RABBIT_USERNAME,
116
                                       password=settings.RABBIT_PASSWORD,
117
                                       virtual_host=settings.RABBIT_VHOST)
118
            except socket.error:
119
                log.error("Failed to connect to %s, retrying in 10s",
120
                                  settings.RABBIT_HOST)
121
                time.sleep(10)
122

    
123
        log.info("Connection succesful, opening channel")
124
        self.chan = conn.channel()
125

    
126
        # Declare queues and exchanges
127
        for exchange in settings.EXCHANGES:
128
            self.chan.exchange_declare(exchange=exchange, type="topic",
129
                                       durable=True, auto_delete=False)
130

    
131
        for queue in QUEUES:
132
            self.chan.queue_declare(queue=queue, durable=True,
133
                                    exclusive=False, auto_delete=False)
134

    
135
        bindings = BINDINGS
136

    
137
        # Bind queues to handler methods
138
        for binding in bindings:
139
            try:
140
                callback = getattr(callbacks, binding[3])
141
            except AttributeError:
142
                log.error("Cannot find callback %s", binding[3])
143
                raise SystemExit(1)
144

    
145
            self.chan.queue_bind(queue=binding[0], exchange=binding[1],
146
                                 routing_key=binding[2])
147
            tag = self.chan.basic_consume(queue=binding[0], callback=callback)
148
            log.debug("Binding %s(%s) to queue %s with handler %s",
149
                              binding[1], binding[2], binding[0], binding[3])
150
            self.clienttags.append(tag)
151

    
152

    
153
def _init_queues():
154
    global QUEUES, BINDINGS
155

    
156
    # Queue declarations
157
    prefix = settings.BACKEND_PREFIX_ID.split('-')[0]
158

    
159
    QUEUE_GANETI_EVENTS_OP = "%s-events-op" % prefix
160
    QUEUE_GANETI_EVENTS_NET = "%s-events-net" % prefix
161
    QUEUE_GANETI_BUILD_PROGR = "%s-events-progress" % prefix
162
    QUEUE_CRON_CREDITS = "%s-credits" % prefix
163
    QUEUE_EMAIL = "%s-email" % prefix
164
    QUEUE_RECONC = "%s-reconciliation" % prefix
165
    if settings.DEBUG is True:
166
        QUEUE_DEBUG = "debug"       # Debug queue, retrieves all messages
167

    
168
    QUEUES = (QUEUE_GANETI_EVENTS_OP, QUEUE_GANETI_EVENTS_NET,
169
              QUEUE_CRON_CREDITS, QUEUE_EMAIL, QUEUE_RECONC,
170
              QUEUE_GANETI_BUILD_PROGR)
171

    
172
    # notifications of type "ganeti-op-status"
173
    DB_HANDLER_KEY_OP = 'ganeti.%s.event.op' % prefix
174
    # notifications of type "ganeti-net-status"
175
    DB_HANDLER_KEY_NET = 'ganeti.%s.event.net' % prefix
176
    # notifications of type "ganeti-create-progress"
177
    BUILD_MONITOR_HANDLER = 'ganeti.%s.event.progress' % prefix
178
    # email
179
    EMAIL_HANDLER = 'logic.%s.email.*' % prefix
180
    # reconciliation
181
    RECONC_HANDLER = 'reconciliation.%s.*' % prefix
182

    
183
    BINDINGS = [
184
    # Queue                   # Exchange                # RouteKey              # Handler
185
    (QUEUE_GANETI_EVENTS_OP,  settings.EXCHANGE_GANETI, DB_HANDLER_KEY_OP,      'update_db'),
186
    (QUEUE_GANETI_EVENTS_NET, settings.EXCHANGE_GANETI, DB_HANDLER_KEY_NET,     'update_net'),
187
    (QUEUE_GANETI_BUILD_PROGR,settings.EXCHANGE_GANETI, BUILD_MONITOR_HANDLER,  'update_build_progress'),
188
    (QUEUE_CRON_CREDITS,      settings.EXCHANGE_CRON,   '*.credits.*',          'update_credits'),
189
    (QUEUE_EMAIL,             settings.EXCHANGE_API,    EMAIL_HANDLER,          'send_email'),
190
    (QUEUE_EMAIL,             settings.EXCHANGE_CRON,   EMAIL_HANDLER,          'send_email'),
191
    (QUEUE_RECONC,            settings.EXCHANGE_CRON,   RECONC_HANDLER,         'trigger_status_update'),
192
    ]
193

    
194
    if settings.DEBUG is True:
195
        BINDINGS += [
196
            # Queue       # Exchange          # RouteKey  # Handler
197
            (QUEUE_DEBUG, settings.EXCHANGE_GANETI, '#',  'dummy_proc'),
198
            (QUEUE_DEBUG, settings.EXCHANGE_CRON,   '#',  'dummy_proc'),
199
            (QUEUE_DEBUG, settings.EXCHANGE_API,    '#',  'dummy_proc'),
200
        ]
201
        QUEUES += (QUEUE_DEBUG,)
202

    
203

    
204
def _exit_handler(signum, frame):
205
    """"Catch exit signal in children processes"""
206
    log.info("Caught signal %d, will raise SystemExit", signum)
207
    raise SystemExit
208

    
209

    
210
def _parent_handler(signum, frame):
211
    """"Catch exit signal in parent process and forward it to children."""
212
    global children
213
    log.info("Caught signal %d, sending SIGTERM to children %s",
214
                signum, children)
215
    [os.kill(pid, SIGTERM) for pid in children]
216

    
217

    
218
def child(cmdline):
219
    """The context of the child process"""
220

    
221
    # Cmd line argument parsing
222
    (opts, args) = parse_arguments(cmdline)
223
    disp = Dispatcher(debug=opts.debug)
224

    
225
    # Start the event loop
226
    disp.wait()
227

    
228

    
229
def parse_arguments(args):
230
    from optparse import OptionParser
231

    
232
    parser = OptionParser()
233
    parser.add_option("-d", "--debug", action="store_true", default=False,
234
                      dest="debug", help="Enable debug mode")
235
    parser.add_option("-w", "--workers", default=2, dest="workers",
236
                      help="Number of workers to spawn", type="int")
237
    parser.add_option("-p", '--pid-file', dest="pid_file",
238
                      default=os.path.join(os.getcwd(), "dispatcher.pid"),
239
                      help="Save PID to file (default:%s)" %
240
                           os.path.join(os.getcwd(), "dispatcher.pid"))
241
    parser.add_option("--purge-queues", action="store_true",
242
                      default=False, dest="purge_queues",
243
                      help="Remove all declared queues (DANGEROUS!)")
244
    parser.add_option("--purge-exchanges", action="store_true",
245
                      default=False, dest="purge_exchanges",
246
                      help="Remove all exchanges. Implies deleting all queues \
247
                           first (DANGEROUS!)")
248
    parser.add_option("--drain-queue", dest="drain_queue",
249
                      help="Strips a queue from all outstanding messages")
250

    
251
    return parser.parse_args(args)
252

    
253

    
254
def purge_queues():
255
    """
256
        Delete declared queues from RabbitMQ. Use with care!
257
    """
258
    global QUEUES, BINDINGS
259
    conn = get_connection()
260
    chan = conn.channel()
261

    
262
    print "Queues to be deleted: ", QUEUES
263

    
264
    if not get_user_confirmation():
265
        return
266

    
267
    for queue in QUEUES:
268
        try:
269
            chan.queue_delete(queue=queue)
270
            print "Deleting queue %s" % queue
271
        except amqp.exceptions.AMQPChannelException as e:
272
            print e.amqp_reply_code, " ", e.amqp_reply_text
273
            chan = conn.channel()
274

    
275
    chan.connection.close()
276

    
277

    
278
def purge_exchanges():
279
    """Delete declared exchanges from RabbitMQ, after removing all queues"""
280
    global QUEUES, BINDINGS
281
    purge_queues()
282

    
283
    conn = get_connection()
284
    chan = conn.channel()
285

    
286
    print "Exchanges to be deleted: ", settings.EXCHANGES
287

    
288
    if not get_user_confirmation():
289
        return
290

    
291
    for exchange in settings.EXCHANGES:
292
        try:
293
            chan.exchange_delete(exchange=exchange)
294
        except amqp.exceptions.AMQPChannelException as e:
295
            print e.amqp_reply_code, " ", e.amqp_reply_text
296

    
297
    chan.connection.close()
298

    
299

    
300
def drain_queue(queue):
301
    """Strip a (declared) queue from all outstanding messages"""
302
    global QUEUES, BINDINGS
303
    if not queue:
304
        return
305

    
306
    if not queue in QUEUES:
307
        print "Queue %s not configured" % queue
308
        return
309

    
310
    print "Queue to be drained: %s" % queue
311

    
312
    if not get_user_confirmation():
313
        return
314
    conn = get_connection()
315
    chan = conn.channel()
316

    
317
    # Register a temporary queue binding
318
    for binding in BINDINGS:
319
        if binding[0] == queue:
320
            exch = binding[1]
321

    
322
    if not exch:
323
        print "Queue not bound to any exchange: %s" % queue
324
        return
325

    
326
    chan.queue_bind(queue=queue, exchange=exch, routing_key='#')
327
    tag = chan.basic_consume(queue=queue, callback=callbacks.dummy_proc)
328

    
329
    print "Queue draining about to start, hit Ctrl+c when done"
330
    time.sleep(2)
331
    print "Queue draining starting"
332

    
333
    signal(SIGTERM, _exit_handler)
334
    signal(SIGINT, _exit_handler)
335

    
336
    num_processed = 0
337
    while True:
338
        chan.wait()
339
        num_processed += 1
340
        sys.stderr.write("Ignored %d messages\r" % num_processed)
341

    
342
    chan.basic_cancel(tag)
343
    chan.connection.close()
344

    
345

    
346
def get_connection():
347
    conn = amqp.Connection(host=settings.RABBIT_HOST,
348
                           userid=settings.RABBIT_USERNAME,
349
                           password=settings.RABBIT_PASSWORD,
350
                           virtual_host=settings.RABBIT_VHOST)
351
    return conn
352

    
353

    
354
def get_user_confirmation():
355
    ans = raw_input("Are you sure (N/y):")
356

    
357
    if not ans:
358
        return False
359
    if ans not in ['Y', 'y']:
360
        return False
361
    return True
362

    
363

    
364
def debug_mode():
365
    disp = Dispatcher(debug=True)
366
    signal(SIGINT, _exit_handler)
367
    signal(SIGTERM, _exit_handler)
368

    
369
    disp.wait()
370

    
371

    
372
def daemon_mode(opts):
373
    global children
374

    
375
    # Create pidfile,
376
    # take care of differences between python-daemon versions
377
    try:
378
        pidf = pidfile.TimeoutPIDLockFile(opts.pid_file, 10)
379
    except:
380
        pidf = pidlockfile.TimeoutPIDLockFile(opts.pid_file, 10)
381

    
382
    pidf.acquire()
383

    
384
    log.info("Became a daemon")
385

    
386
    # Fork workers
387
    children = []
388

    
389
    i = 0
390
    while i < opts.workers:
391
        newpid = os.fork()
392

    
393
        if newpid == 0:
394
            signal(SIGINT, _exit_handler)
395
            signal(SIGTERM, _exit_handler)
396
            child(sys.argv[1:])
397
            sys.exit(1)
398
        else:
399
            log.debug("%d, forked child: %d", os.getpid(), newpid)
400
            children.append(newpid)
401
        i += 1
402

    
403
    # Catch signals to ensure graceful shutdown
404
    signal(SIGINT, _parent_handler)
405
    signal(SIGTERM, _parent_handler)
406

    
407
    # Wait for all children processes to die, one by one
408
    try:
409
        for pid in children:
410
            try:
411
                os.waitpid(pid, 0)
412
            except Exception:
413
                pass
414
    finally:
415
        pidf.release()
416

    
417

    
418
def main():
419
    global log
420
    (opts, args) = parse_arguments(sys.argv[1:])
421

    
422
    # Init the global variables containing the queues
423
    _init_queues()
424

    
425
    # Special case for the clean up queues action
426
    if opts.purge_queues:
427
        purge_queues()
428
        return
429

    
430
    # Special case for the clean up exch action
431
    if opts.purge_exchanges:
432
        purge_exchanges()
433
        return
434

    
435
    if opts.drain_queue:
436
        drain_queue(opts.drain_queue)
437
        return
438

    
439
    # Debug mode, process messages without spawning workers
440
    if opts.debug:
441
        debug_mode()
442
        return
443

    
444
    files_preserve = []
445
    for handler in log.handlers:
446
        stream = getattr(handler, 'stream')
447
        if stream and hasattr(stream, 'fileno'):
448
            files_preserve.append(handler.stream)
449

    
450
    daemon_context = daemon.DaemonContext(
451
        files_preserve=files_preserve,
452
        umask=022)
453

    
454
    daemon_context.open()
455

    
456
    # Catch every exception, make sure it gets logged properly
457
    try:
458
        daemon_mode(opts)
459
    except Exception:
460
        log.exception("Unknown error")
461
        raise
462

    
463

    
464
def scriptmain():
465
    dictConfig(settings.DISPATCHER_LOGGING)
466
    sys.exit(main())
467

    
468
if __name__ == "__main__":
469
    scriptmain()
470

    
471
# vim: set sta sts=4 shiftwidth=4 sw=4 et ai :