Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / dispatcher.py @ 1ed37c1d

History | View | Annotate | Download (14.2 kB)

1
#!/usr/bin/env python
2
# Copyright 2011 GRNET S.A. All rights reserved.
3
#
4
# Redistribution and use in source and binary forms, with or without
5
# modification, are permitted provided that the following conditions
6
# are met:
7
#
8
#   1. Redistributions of source code must retain the above copyright
9
#      notice, this list of conditions and the following disclaimer.
10
#
11
#  2. Redistributions in binary form must reproduce the above copyright
12
#     notice, this list of conditions and the following disclaimer in the
13
#     documentation and/or other materials provided with the distribution.
14
#
15
# THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
16
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18
# ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
19
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
20
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
21
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
22
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
23
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
24
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
25
# SUCH DAMAGE.
26
#
27
# The views and conclusions contained in the software and documentation are
28
# those of the authors and should not be interpreted as representing official
29
# policies, either expressed or implied, of GRNET S.A.
30

    
31

    
32
""" Message queue setup, dispatch and admin
33

34
This program sets up connections to the queues configured in settings.py
35
and implements the message wait and dispatch loops. Actual messages are
36
handled in the dispatched functions.
37

38
"""
39
from django.core.management import setup_environ
40

    
41
import sys
42
import os
43
path = os.path.normpath(os.path.join(os.getcwd(), '..'))
44
sys.path.append(path)
45

    
46
from synnefo import settings
47
setup_environ(settings)
48

    
49
from amqplib import client_0_8 as amqp
50
from signal import signal, SIGINT, SIGTERM
51

    
52
import logging
53
import time
54
import socket
55
from daemon import daemon
56

    
57
# Take care of differences between python-daemon versions.
58
try:
59
    from daemon import pidfile
60
except:
61
    from daemon import pidlockfile
62

    
63
from synnefo.logic import callbacks
64
from synnefo.util.dictconfig import dictConfig
65

    
66

    
67
log = logging.getLogger()
68

    
69

    
70
# Queue names
71
QUEUES = []
72

    
73
# Queue bindings to exchanges
74
BINDINGS = []
75

    
76

    
77
class Dispatcher:
78
    chan = None
79
    debug = False
80
    clienttags = []
81

    
82
    def __init__(self, debug=False):
83
        self.debug = debug
84
        self._init()
85

    
86
    def wait(self):
87
        while True:
88
            try:
89
                self.chan.wait()
90
            except SystemExit:
91
                break
92
            except amqp.exceptions.AMQPConnectionException:
93
                log.error("Server went away, reconnecting...")
94
                self._init()
95
            except socket.error:
96
                log.error("Server went away, reconnecting...")
97
                self._init()
98
            except Exception, e:
99
                log.exception("Caught unexpected exception")
100

    
101
        [self.chan.basic_cancel(clienttag) for clienttag in self.clienttags]
102
        self.chan.connection.close()
103
        self.chan.close()
104

    
105
    def _init(self):
106
        global QUEUES, BINDINGS
107
        log.info("Initializing")
108

    
109
        # Connect to RabbitMQ
110
        conn = None
111
        while conn == None:
112
            log.info("Attempting to connect to %s", settings.RABBIT_HOST)
113
            try:
114
                conn = amqp.Connection(host=settings.RABBIT_HOST,
115
                                       userid=settings.RABBIT_USERNAME,
116
                                       password=settings.RABBIT_PASSWORD,
117
                                       virtual_host=settings.RABBIT_VHOST)
118
            except socket.error:
119
                log.error("Failed to connect to %s, retrying in 10s",
120
                                  settings.RABBIT_HOST)
121
                time.sleep(10)
122

    
123
        log.info("Connection succesful, opening channel")
124
        self.chan = conn.channel()
125

    
126
        # Declare queues and exchanges
127
        for exchange in settings.EXCHANGES:
128
            self.chan.exchange_declare(exchange=exchange, type="topic",
129
                                       durable=True, auto_delete=False)
130

    
131
        for queue in QUEUES:
132
            self.chan.queue_declare(queue=queue, durable=True,
133
                                    exclusive=False, auto_delete=False)
134

    
135
        bindings = BINDINGS
136

    
137
        # Bind queues to handler methods
138
        for binding in bindings:
139
            try:
140
                callback = getattr(callbacks, binding[3])
141
            except AttributeError:
142
                log.error("Cannot find callback %s", binding[3])
143
                raise SystemExit(1)
144

    
145
            self.chan.queue_bind(queue=binding[0], exchange=binding[1],
146
                                 routing_key=binding[2])
147
            tag = self.chan.basic_consume(queue=binding[0], callback=callback)
148
            log.debug("Binding %s(%s) to queue %s with handler %s",
149
                              binding[1], binding[2], binding[0], binding[3])
150
            self.clienttags.append(tag)
151

    
152

    
153
def _init_queues():
154
    global QUEUES, BINDINGS
155

    
156
    # Queue declarations
157
    prefix = settings.BACKEND_PREFIX_ID.split('-')[0]
158

    
159
    QUEUE_GANETI_EVENTS_OP = "%s-events-op" % prefix
160
    QUEUE_GANETI_EVENTS_NET = "%s-events-net" % prefix
161
    QUEUE_GANETI_BUILD_PROGR = "%s-events-progress" % prefix
162
    QUEUE_EMAIL = "%s-email" % prefix
163
    QUEUE_RECONC = "%s-reconciliation" % prefix
164
    if settings.DEBUG is True:
165
        QUEUE_DEBUG = "debug"       # Debug queue, retrieves all messages
166

    
167
    QUEUES = (QUEUE_GANETI_EVENTS_OP, QUEUE_GANETI_EVENTS_NET,
168
              QUEUE_EMAIL, QUEUE_RECONC, QUEUE_GANETI_BUILD_PROGR)
169

    
170
    # notifications of type "ganeti-op-status"
171
    DB_HANDLER_KEY_OP = 'ganeti.%s.event.op' % prefix
172
    # notifications of type "ganeti-net-status"
173
    DB_HANDLER_KEY_NET = 'ganeti.%s.event.net' % prefix
174
    # notifications of type "ganeti-create-progress"
175
    BUILD_MONITOR_HANDLER = 'ganeti.%s.event.progress' % prefix
176
    # email
177
    EMAIL_HANDLER = 'logic.%s.email.*' % prefix
178
    # reconciliation
179
    RECONC_HANDLER = 'reconciliation.%s.*' % prefix
180

    
181
    BINDINGS = [
182
    # Queue                   # Exchange                # RouteKey              # Handler
183
    (QUEUE_GANETI_EVENTS_OP,  settings.EXCHANGE_GANETI, DB_HANDLER_KEY_OP,      'update_db'),
184
    (QUEUE_GANETI_EVENTS_NET, settings.EXCHANGE_GANETI, DB_HANDLER_KEY_NET,     'update_net'),
185
    (QUEUE_GANETI_BUILD_PROGR,settings.EXCHANGE_GANETI, BUILD_MONITOR_HANDLER,  'update_build_progress'),
186
    (QUEUE_EMAIL,             settings.EXCHANGE_API,    EMAIL_HANDLER,          'send_email'),
187
    (QUEUE_EMAIL,             settings.EXCHANGE_CRON,   EMAIL_HANDLER,          'send_email'),
188
    (QUEUE_RECONC,            settings.EXCHANGE_CRON,   RECONC_HANDLER,         'trigger_status_update'),
189
    ]
190

    
191
    if settings.DEBUG is True:
192
        BINDINGS += [
193
            # Queue       # Exchange          # RouteKey  # Handler
194
            (QUEUE_DEBUG, settings.EXCHANGE_GANETI, '#',  'dummy_proc'),
195
            (QUEUE_DEBUG, settings.EXCHANGE_CRON,   '#',  'dummy_proc'),
196
            (QUEUE_DEBUG, settings.EXCHANGE_API,    '#',  'dummy_proc'),
197
        ]
198
        QUEUES += (QUEUE_DEBUG,)
199

    
200

    
201
def _exit_handler(signum, frame):
202
    """"Catch exit signal in children processes"""
203
    log.info("Caught signal %d, will raise SystemExit", signum)
204
    raise SystemExit
205

    
206

    
207
def _parent_handler(signum, frame):
208
    """"Catch exit signal in parent process and forward it to children."""
209
    global children
210
    log.info("Caught signal %d, sending SIGTERM to children %s",
211
                signum, children)
212
    [os.kill(pid, SIGTERM) for pid in children]
213

    
214

    
215
def child(cmdline):
216
    """The context of the child process"""
217

    
218
    # Cmd line argument parsing
219
    (opts, args) = parse_arguments(cmdline)
220
    disp = Dispatcher(debug=opts.debug)
221

    
222
    # Start the event loop
223
    disp.wait()
224

    
225

    
226
def parse_arguments(args):
227
    from optparse import OptionParser
228

    
229
    default_pid_file = os.path.join("var","run","synnefo","dispatcher.pid")
230
    parser = OptionParser()
231
    parser.add_option("-d", "--debug", action="store_true", default=False,
232
                      dest="debug", help="Enable debug mode")
233
    parser.add_option("-w", "--workers", default=2, dest="workers",
234
                      help="Number of workers to spawn", type="int")
235
    parser.add_option("-p", "--pid-file", dest="pid_file",
236
                      default=default_pid_file,
237
                      help="Save PID to file (default: %s)" % default_pid_file)
238
    parser.add_option("--purge-queues", action="store_true",
239
                      default=False, dest="purge_queues",
240
                      help="Remove all declared queues (DANGEROUS!)")
241
    parser.add_option("--purge-exchanges", action="store_true",
242
                      default=False, dest="purge_exchanges",
243
                      help="Remove all exchanges. Implies deleting all queues \
244
                           first (DANGEROUS!)")
245
    parser.add_option("--drain-queue", dest="drain_queue",
246
                      help="Strips a queue from all outstanding messages")
247

    
248
    return parser.parse_args(args)
249

    
250

    
251
def purge_queues():
252
    """
253
        Delete declared queues from RabbitMQ. Use with care!
254
    """
255
    global QUEUES, BINDINGS
256
    conn = get_connection()
257
    chan = conn.channel()
258

    
259
    print "Queues to be deleted: ", QUEUES
260

    
261
    if not get_user_confirmation():
262
        return
263

    
264
    for queue in QUEUES:
265
        try:
266
            chan.queue_delete(queue=queue)
267
            print "Deleting queue %s" % queue
268
        except amqp.exceptions.AMQPChannelException as e:
269
            print e.amqp_reply_code, " ", e.amqp_reply_text
270
            chan = conn.channel()
271

    
272
    chan.connection.close()
273

    
274

    
275
def purge_exchanges():
276
    """Delete declared exchanges from RabbitMQ, after removing all queues"""
277
    global QUEUES, BINDINGS
278
    purge_queues()
279

    
280
    conn = get_connection()
281
    chan = conn.channel()
282

    
283
    print "Exchanges to be deleted: ", settings.EXCHANGES
284

    
285
    if not get_user_confirmation():
286
        return
287

    
288
    for exchange in settings.EXCHANGES:
289
        try:
290
            chan.exchange_delete(exchange=exchange)
291
        except amqp.exceptions.AMQPChannelException as e:
292
            print e.amqp_reply_code, " ", e.amqp_reply_text
293

    
294
    chan.connection.close()
295

    
296

    
297
def drain_queue(queue):
298
    """Strip a (declared) queue from all outstanding messages"""
299
    global QUEUES, BINDINGS
300
    if not queue:
301
        return
302

    
303
    if not queue in QUEUES:
304
        print "Queue %s not configured" % queue
305
        return
306

    
307
    print "Queue to be drained: %s" % queue
308

    
309
    if not get_user_confirmation():
310
        return
311
    conn = get_connection()
312
    chan = conn.channel()
313

    
314
    # Register a temporary queue binding
315
    for binding in BINDINGS:
316
        if binding[0] == queue:
317
            exch = binding[1]
318

    
319
    if not exch:
320
        print "Queue not bound to any exchange: %s" % queue
321
        return
322

    
323
    chan.queue_bind(queue=queue, exchange=exch, routing_key='#')
324
    tag = chan.basic_consume(queue=queue, callback=callbacks.dummy_proc)
325

    
326
    print "Queue draining about to start, hit Ctrl+c when done"
327
    time.sleep(2)
328
    print "Queue draining starting"
329

    
330
    signal(SIGTERM, _exit_handler)
331
    signal(SIGINT, _exit_handler)
332

    
333
    num_processed = 0
334
    while True:
335
        chan.wait()
336
        num_processed += 1
337
        sys.stderr.write("Ignored %d messages\r" % num_processed)
338

    
339
    chan.basic_cancel(tag)
340
    chan.connection.close()
341

    
342

    
343
def get_connection():
344
    conn = amqp.Connection(host=settings.RABBIT_HOST,
345
                           userid=settings.RABBIT_USERNAME,
346
                           password=settings.RABBIT_PASSWORD,
347
                           virtual_host=settings.RABBIT_VHOST)
348
    return conn
349

    
350

    
351
def get_user_confirmation():
352
    ans = raw_input("Are you sure (N/y):")
353

    
354
    if not ans:
355
        return False
356
    if ans not in ['Y', 'y']:
357
        return False
358
    return True
359

    
360

    
361
def debug_mode():
362
    disp = Dispatcher(debug=True)
363
    signal(SIGINT, _exit_handler)
364
    signal(SIGTERM, _exit_handler)
365

    
366
    disp.wait()
367

    
368

    
369
def daemon_mode(opts):
370
    global children
371

    
372
    # Create pidfile,
373
    # take care of differences between python-daemon versions
374
    try:
375
        pidf = pidfile.TimeoutPIDLockFile(opts.pid_file, 10)
376
    except:
377
        pidf = pidlockfile.TimeoutPIDLockFile(opts.pid_file, 10)
378

    
379
    pidf.acquire()
380

    
381
    log.info("Became a daemon")
382

    
383
    # Fork workers
384
    children = []
385

    
386
    i = 0
387
    while i < opts.workers:
388
        newpid = os.fork()
389

    
390
        if newpid == 0:
391
            signal(SIGINT, _exit_handler)
392
            signal(SIGTERM, _exit_handler)
393
            child(sys.argv[1:])
394
            sys.exit(1)
395
        else:
396
            log.debug("%d, forked child: %d", os.getpid(), newpid)
397
            children.append(newpid)
398
        i += 1
399

    
400
    # Catch signals to ensure graceful shutdown
401
    signal(SIGINT, _parent_handler)
402
    signal(SIGTERM, _parent_handler)
403

    
404
    # Wait for all children processes to die, one by one
405
    try:
406
        for pid in children:
407
            try:
408
                os.waitpid(pid, 0)
409
            except Exception:
410
                pass
411
    finally:
412
        pidf.release()
413

    
414

    
415
def main():
416
    dictConfig(settings.DISPATCHER_LOGGING)
417

    
418
    global log
419

    
420
    (opts, args) = parse_arguments(sys.argv[1:])
421

    
422
    # Init the global variables containing the queues
423
    _init_queues()
424

    
425
    # Special case for the clean up queues action
426
    if opts.purge_queues:
427
        purge_queues()
428
        return
429

    
430
    # Special case for the clean up exch action
431
    if opts.purge_exchanges:
432
        purge_exchanges()
433
        return
434

    
435
    if opts.drain_queue:
436
        drain_queue(opts.drain_queue)
437
        return
438

    
439
    # Debug mode, process messages without spawning workers
440
    if opts.debug:
441
        debug_mode()
442
        return
443

    
444
    files_preserve = []
445
    for handler in log.handlers:
446
        stream = getattr(handler, 'stream')
447
        if stream and hasattr(stream, 'fileno'):
448
            files_preserve.append(handler.stream)
449

    
450
    daemon_context = daemon.DaemonContext(
451
        files_preserve=files_preserve,
452
        umask=022)
453

    
454
    daemon_context.open()
455

    
456
    # Catch every exception, make sure it gets logged properly
457
    try:
458
        daemon_mode(opts)
459
    except Exception:
460
        log.exception("Unknown error")
461
        raise
462

    
463

    
464
if __name__ == "__main__":
465
    sys.exit(main())
466

    
467
# vim: set sta sts=4 shiftwidth=4 sw=4 et ai :