Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / dispatcher.py @ 567ffb85

History | View | Annotate | Download (13.9 kB)

1
#!/usr/bin/env python
2
# Copyright 2011 GRNET S.A. All rights reserved.
3
#
4
# Redistribution and use in source and binary forms, with or without
5
# modification, are permitted provided that the following conditions
6
# are met:
7
#
8
#   1. Redistributions of source code must retain the above copyright
9
#      notice, this list of conditions and the following disclaimer.
10
#
11
#  2. Redistributions in binary form must reproduce the above copyright
12
#     notice, this list of conditions and the following disclaimer in the
13
#     documentation and/or other materials provided with the distribution.
14
#
15
# THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
16
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18
# ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
19
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
20
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
21
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
22
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
23
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
24
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
25
# SUCH DAMAGE.
26
#
27
# The views and conclusions contained in the software and documentation are
28
# those of the authors and should not be interpreted as representing official
29
# policies, either expressed or implied, of GRNET S.A.
30

    
31

    
32
""" Message queue setup, dispatch and admin
33

34
This program sets up connections to the queues configured in settings.py
35
and implements the message wait and dispatch loops. Actual messages are
36
handled in the dispatched functions.
37

38
"""
39
from django.core.management import setup_environ
40

    
41
import sys
42
import os
43
path = os.path.normpath(os.path.join(os.getcwd(), '..'))
44
sys.path.append(path)
45

    
46
from synnefo import settings
47
setup_environ(settings)
48

    
49
from amqplib import client_0_8 as amqp
50
from signal import signal, SIGINT, SIGTERM
51

    
52
import logging
53
import time
54
import socket
55
from daemon import daemon
56

    
57
# Take care of differences between python-daemon versions.
58
try:
59
    from daemon import pidfile
60
except:
61
    from daemon import pidlockfile
62

    
63
from synnefo.logic import callbacks
64
from synnefo.util.dictconfig import dictConfig
65

    
66

    
67
log = logging.getLogger()
68

    
69

    
70
# Queue names
71
QUEUES = []
72

    
73
# Queue bindings to exchanges
74
BINDINGS = []
75

    
76

    
77
class Dispatcher:
78
    chan = None
79
    debug = False
80
    clienttags = []
81

    
82
    def __init__(self, debug=False):
83
        self.debug = debug
84
        self._init()
85

    
86
    def wait(self):
87
        while True:
88
            try:
89
                self.chan.wait()
90
            except SystemExit:
91
                break
92
            except amqp.exceptions.AMQPConnectionException:
93
                log.error("Server went away, reconnecting...")
94
                self._init()
95
            except socket.error:
96
                log.error("Server went away, reconnecting...")
97
                self._init()
98
            except Exception, e:
99
                log.exception("Caught unexpected exception")
100

    
101
        [self.chan.basic_cancel(clienttag) for clienttag in self.clienttags]
102
        self.chan.connection.close()
103
        self.chan.close()
104

    
105
    def _init(self):
106
        global QUEUES, BINDINGS
107
        log.info("Initializing")
108

    
109
        # Connect to RabbitMQ
110
        conn = None
111
        while conn == None:
112
            log.info("Attempting to connect to %s", settings.RABBIT_HOST)
113
            try:
114
                conn = amqp.Connection(host=settings.RABBIT_HOST,
115
                                       userid=settings.RABBIT_USERNAME,
116
                                       password=settings.RABBIT_PASSWORD,
117
                                       virtual_host=settings.RABBIT_VHOST)
118
            except socket.error:
119
                log.error("Failed to connect to %s, retrying in 10s",
120
                                  settings.RABBIT_HOST)
121
                time.sleep(10)
122

    
123
        log.info("Connection succesful, opening channel")
124
        self.chan = conn.channel()
125

    
126
        # Declare queues and exchanges
127
        for exchange in settings.EXCHANGES:
128
            self.chan.exchange_declare(exchange=exchange, type="topic",
129
                                       durable=True, auto_delete=False)
130

    
131
        for queue in QUEUES:
132
            self.chan.queue_declare(queue=queue, durable=True,
133
                                    exclusive=False, auto_delete=False)
134

    
135
        bindings = BINDINGS
136

    
137
        # Bind queues to handler methods
138
        for binding in bindings:
139
            try:
140
                callback = getattr(callbacks, binding[3])
141
            except AttributeError:
142
                log.error("Cannot find callback %s", binding[3])
143
                raise SystemExit(1)
144

    
145
            self.chan.queue_bind(queue=binding[0], exchange=binding[1],
146
                                 routing_key=binding[2])
147
            tag = self.chan.basic_consume(queue=binding[0], callback=callback)
148
            log.debug("Binding %s(%s) to queue %s with handler %s",
149
                              binding[1], binding[2], binding[0], binding[3])
150
            self.clienttags.append(tag)
151

    
152

    
153
def _init_queues():
154
    global QUEUES, BINDINGS
155

    
156
    # Queue declarations
157
    prefix = settings.BACKEND_PREFIX_ID.split('-')[0]
158

    
159
    QUEUE_GANETI_EVENTS_OP = "%s-events-op" % prefix
160
    QUEUE_GANETI_EVENTS_NET = "%s-events-net" % prefix
161
    QUEUE_GANETI_BUILD_PROGR = "%s-events-progress" % prefix
162
    QUEUE_RECONC = "%s-reconciliation" % prefix
163
    if settings.DEBUG is True:
164
        QUEUE_DEBUG = "debug"       # Debug queue, retrieves all messages
165

    
166
    QUEUES = (QUEUE_GANETI_EVENTS_OP, QUEUE_GANETI_EVENTS_NET, QUEUE_RECONC,
167
              QUEUE_GANETI_BUILD_PROGR)
168

    
169
    # notifications of type "ganeti-op-status"
170
    DB_HANDLER_KEY_OP = 'ganeti.%s.event.op' % prefix
171
    # notifications of type "ganeti-net-status"
172
    DB_HANDLER_KEY_NET = 'ganeti.%s.event.net' % prefix
173
    # notifications of type "ganeti-create-progress"
174
    BUILD_MONITOR_HANDLER = 'ganeti.%s.event.progress' % prefix
175
    # reconciliation
176
    RECONC_HANDLER = 'reconciliation.%s.*' % prefix
177

    
178
    BINDINGS = [
179
    # Queue                   # Exchange                # RouteKey              # Handler
180
    (QUEUE_GANETI_EVENTS_OP,  settings.EXCHANGE_GANETI, DB_HANDLER_KEY_OP,      'update_db'),
181
    (QUEUE_GANETI_EVENTS_NET, settings.EXCHANGE_GANETI, DB_HANDLER_KEY_NET,     'update_net'),
182
    (QUEUE_GANETI_BUILD_PROGR,settings.EXCHANGE_GANETI, BUILD_MONITOR_HANDLER,  'update_build_progress'),
183
    (QUEUE_RECONC,            settings.EXCHANGE_CRON,   RECONC_HANDLER,         'trigger_status_update'),
184
    ]
185

    
186
    if settings.DEBUG is True:
187
        BINDINGS += [
188
            # Queue       # Exchange          # RouteKey  # Handler
189
            (QUEUE_DEBUG, settings.EXCHANGE_GANETI, '#',  'dummy_proc'),
190
            (QUEUE_DEBUG, settings.EXCHANGE_CRON,   '#',  'dummy_proc'),
191
            (QUEUE_DEBUG, settings.EXCHANGE_API,    '#',  'dummy_proc'),
192
        ]
193
        QUEUES += (QUEUE_DEBUG,)
194

    
195

    
196
def _exit_handler(signum, frame):
197
    """"Catch exit signal in children processes"""
198
    log.info("Caught signal %d, will raise SystemExit", signum)
199
    raise SystemExit
200

    
201

    
202
def _parent_handler(signum, frame):
203
    """"Catch exit signal in parent process and forward it to children."""
204
    global children
205
    log.info("Caught signal %d, sending SIGTERM to children %s",
206
                signum, children)
207
    [os.kill(pid, SIGTERM) for pid in children]
208

    
209

    
210
def child(cmdline):
211
    """The context of the child process"""
212

    
213
    # Cmd line argument parsing
214
    (opts, args) = parse_arguments(cmdline)
215
    disp = Dispatcher(debug=opts.debug)
216

    
217
    # Start the event loop
218
    disp.wait()
219

    
220

    
221
def parse_arguments(args):
222
    from optparse import OptionParser
223

    
224
    default_pid_file = os.path.join("var","run","synnefo","dispatcher.pid")
225
    parser = OptionParser()
226
    parser.add_option("-d", "--debug", action="store_true", default=False,
227
                      dest="debug", help="Enable debug mode")
228
    parser.add_option("-w", "--workers", default=2, dest="workers",
229
                      help="Number of workers to spawn", type="int")
230
    parser.add_option("-p", "--pid-file", dest="pid_file",
231
                      default=default_pid_file,
232
                      help="Save PID to file (default: %s)" % default_pid_file)
233
    parser.add_option("--purge-queues", action="store_true",
234
                      default=False, dest="purge_queues",
235
                      help="Remove all declared queues (DANGEROUS!)")
236
    parser.add_option("--purge-exchanges", action="store_true",
237
                      default=False, dest="purge_exchanges",
238
                      help="Remove all exchanges. Implies deleting all queues \
239
                           first (DANGEROUS!)")
240
    parser.add_option("--drain-queue", dest="drain_queue",
241
                      help="Strips a queue from all outstanding messages")
242

    
243
    return parser.parse_args(args)
244

    
245

    
246
def purge_queues():
247
    """
248
        Delete declared queues from RabbitMQ. Use with care!
249
    """
250
    global QUEUES, BINDINGS
251
    conn = get_connection()
252
    chan = conn.channel()
253

    
254
    print "Queues to be deleted: ", QUEUES
255

    
256
    if not get_user_confirmation():
257
        return
258

    
259
    for queue in QUEUES:
260
        try:
261
            chan.queue_delete(queue=queue)
262
            print "Deleting queue %s" % queue
263
        except amqp.exceptions.AMQPChannelException as e:
264
            print e.amqp_reply_code, " ", e.amqp_reply_text
265
            chan = conn.channel()
266

    
267
    chan.connection.close()
268

    
269

    
270
def purge_exchanges():
271
    """Delete declared exchanges from RabbitMQ, after removing all queues"""
272
    global QUEUES, BINDINGS
273
    purge_queues()
274

    
275
    conn = get_connection()
276
    chan = conn.channel()
277

    
278
    print "Exchanges to be deleted: ", settings.EXCHANGES
279

    
280
    if not get_user_confirmation():
281
        return
282

    
283
    for exchange in settings.EXCHANGES:
284
        try:
285
            chan.exchange_delete(exchange=exchange)
286
        except amqp.exceptions.AMQPChannelException as e:
287
            print e.amqp_reply_code, " ", e.amqp_reply_text
288

    
289
    chan.connection.close()
290

    
291

    
292
def drain_queue(queue):
293
    """Strip a (declared) queue from all outstanding messages"""
294
    global QUEUES, BINDINGS
295
    if not queue:
296
        return
297

    
298
    if not queue in QUEUES:
299
        print "Queue %s not configured" % queue
300
        return
301

    
302
    print "Queue to be drained: %s" % queue
303

    
304
    if not get_user_confirmation():
305
        return
306
    conn = get_connection()
307
    chan = conn.channel()
308

    
309
    # Register a temporary queue binding
310
    for binding in BINDINGS:
311
        if binding[0] == queue:
312
            exch = binding[1]
313

    
314
    if not exch:
315
        print "Queue not bound to any exchange: %s" % queue
316
        return
317

    
318
    chan.queue_bind(queue=queue, exchange=exch, routing_key='#')
319
    tag = chan.basic_consume(queue=queue, callback=callbacks.dummy_proc)
320

    
321
    print "Queue draining about to start, hit Ctrl+c when done"
322
    time.sleep(2)
323
    print "Queue draining starting"
324

    
325
    signal(SIGTERM, _exit_handler)
326
    signal(SIGINT, _exit_handler)
327

    
328
    num_processed = 0
329
    while True:
330
        chan.wait()
331
        num_processed += 1
332
        sys.stderr.write("Ignored %d messages\r" % num_processed)
333

    
334
    chan.basic_cancel(tag)
335
    chan.connection.close()
336

    
337

    
338
def get_connection():
339
    conn = amqp.Connection(host=settings.RABBIT_HOST,
340
                           userid=settings.RABBIT_USERNAME,
341
                           password=settings.RABBIT_PASSWORD,
342
                           virtual_host=settings.RABBIT_VHOST)
343
    return conn
344

    
345

    
346
def get_user_confirmation():
347
    ans = raw_input("Are you sure (N/y):")
348

    
349
    if not ans:
350
        return False
351
    if ans not in ['Y', 'y']:
352
        return False
353
    return True
354

    
355

    
356
def debug_mode():
357
    disp = Dispatcher(debug=True)
358
    signal(SIGINT, _exit_handler)
359
    signal(SIGTERM, _exit_handler)
360

    
361
    disp.wait()
362

    
363

    
364
def daemon_mode(opts):
365
    global children
366

    
367
    # Create pidfile,
368
    # take care of differences between python-daemon versions
369
    try:
370
        pidf = pidfile.TimeoutPIDLockFile(opts.pid_file, 10)
371
    except:
372
        pidf = pidlockfile.TimeoutPIDLockFile(opts.pid_file, 10)
373

    
374
    pidf.acquire()
375

    
376
    log.info("Became a daemon")
377

    
378
    # Fork workers
379
    children = []
380

    
381
    i = 0
382
    while i < opts.workers:
383
        newpid = os.fork()
384

    
385
        if newpid == 0:
386
            signal(SIGINT, _exit_handler)
387
            signal(SIGTERM, _exit_handler)
388
            child(sys.argv[1:])
389
            sys.exit(1)
390
        else:
391
            log.debug("%d, forked child: %d", os.getpid(), newpid)
392
            children.append(newpid)
393
        i += 1
394

    
395
    # Catch signals to ensure graceful shutdown
396
    signal(SIGINT, _parent_handler)
397
    signal(SIGTERM, _parent_handler)
398

    
399
    # Wait for all children processes to die, one by one
400
    try:
401
        for pid in children:
402
            try:
403
                os.waitpid(pid, 0)
404
            except Exception:
405
                pass
406
    finally:
407
        pidf.release()
408

    
409

    
410
def main():
411
    dictConfig(settings.DISPATCHER_LOGGING)
412

    
413
    global log
414

    
415
    (opts, args) = parse_arguments(sys.argv[1:])
416

    
417
    # Init the global variables containing the queues
418
    _init_queues()
419

    
420
    # Special case for the clean up queues action
421
    if opts.purge_queues:
422
        purge_queues()
423
        return
424

    
425
    # Special case for the clean up exch action
426
    if opts.purge_exchanges:
427
        purge_exchanges()
428
        return
429

    
430
    if opts.drain_queue:
431
        drain_queue(opts.drain_queue)
432
        return
433

    
434
    # Debug mode, process messages without spawning workers
435
    if opts.debug:
436
        debug_mode()
437
        return
438

    
439
    files_preserve = []
440
    for handler in log.handlers:
441
        stream = getattr(handler, 'stream')
442
        if stream and hasattr(stream, 'fileno'):
443
            files_preserve.append(handler.stream)
444

    
445
    daemon_context = daemon.DaemonContext(
446
        files_preserve=files_preserve,
447
        umask=022)
448

    
449
    daemon_context.open()
450

    
451
    # Catch every exception, make sure it gets logged properly
452
    try:
453
        daemon_mode(opts)
454
    except Exception:
455
        log.exception("Unknown error")
456
        raise
457

    
458

    
459
if __name__ == "__main__":
460
    sys.exit(main())
461

    
462
# vim: set sta sts=4 shiftwidth=4 sw=4 et ai :