Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / dispatcher.py @ ff77b897

History | View | Annotate | Download (12 kB)

1
#!/usr/bin/env python
2
# Copyright 2011 GRNET S.A. All rights reserved.
3
#
4
# Redistribution and use in source and binary forms, with or without
5
# modification, are permitted provided that the following conditions
6
# are met:
7
#
8
#   1. Redistributions of source code must retain the above copyright
9
#      notice, this list of conditions and the following disclaimer.
10
#
11
#  2. Redistributions in binary form must reproduce the above copyright
12
#     notice, this list of conditions and the following disclaimer in the
13
#     documentation and/or other materials provided with the distribution.
14
#
15
# THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
16
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18
# ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
19
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
20
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
21
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
22
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
23
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
24
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
25
# SUCH DAMAGE.
26
#
27
# The views and conclusions contained in the software and documentation are
28
# those of the authors and should not be interpreted as representing official
29
# policies, either expressed or implied, of GRNET S.A.
30

    
31

    
32
""" Message queue setup, dispatch and admin
33

34
This program sets up connections to the queues configured in settings.py
35
and implements the message wait and dispatch loops. Actual messages are
36
handled in the dispatched functions.
37

38
"""
39

    
40
# Fix path to import synnefo settings
41
import sys
42
import os
43
path = os.path.normpath(os.path.join(os.getcwd(), '..'))
44
sys.path.append(path)
45

    
46
os.environ['DJANGO_SETTINGS_MODULE'] = 'synnefo.settings'
47
from django.conf import settings
48

    
49
from django.db import close_connection
50

    
51
import time
52

    
53
import daemon
54
import daemon.runner
55
from lockfile import LockTimeout
56
# Take care of differences between python-daemon versions.
57
try:
58
    from daemon import pidfile as pidlockfile
59
except:
60
    from daemon import pidlockfile
61
import setproctitle
62

    
63
from synnefo.lib.amqp import AMQPClient
64
from synnefo.logic import callbacks
65
from synnefo.logic import queues
66

    
67
import logging
68

    
69
log = logging.getLogger("dispatcher")
70
log_amqp = logging.getLogger("amqp")
71
log_logic = logging.getLogger("synnefo.logic")
72

    
73
LOGGERS = [log, log_amqp, log_logic]
74

    
75

    
76
class Dispatcher:
77
    debug = False
78

    
79
    def __init__(self, debug=False):
80
        self.debug = debug
81
        self._init()
82

    
83
    def wait(self):
84
        log.info("Waiting for messages..")
85
        timeout = 600
86
        while True:
87
            try:
88
                # Close the Django DB connection before processing
89
                # every incoming message. This plays nicely with
90
                # DB connection pooling, if enabled and allows
91
                # the dispatcher to recover from broken connections
92
                # gracefully.
93
                close_connection()
94
                msg = self.client.basic_wait(timeout=timeout)
95
                if not msg:
96
                    log.warning("Idle connection for %d seconds. Will connect"
97
                                " to a different host. Verify that"
98
                                " snf-ganeti-eventd is running!!", timeout)
99
                    self.client.reconnect()
100
            except SystemExit:
101
                break
102
            except KeyboardInterrupt:
103
                break
104
            except Exception as e:
105
                log.exception("Caught unexpected exception: %s", e)
106

    
107
        self.client.basic_cancel()
108
        self.client.close()
109

    
110
    def _init(self):
111
        log.info("Initializing")
112

    
113
        self.client = AMQPClient(logger=log_amqp)
114
        # Connect to AMQP host
115
        self.client.connect()
116

    
117
        # Declare queues and exchanges
118
        exchange = settings.EXCHANGE_GANETI
119
        exchange_dl = queues.convert_exchange_to_dead(exchange)
120
        self.client.exchange_declare(exchange=exchange,
121
                                     type="topic")
122
        self.client.exchange_declare(exchange=exchange_dl,
123
                                     type="topic")
124

    
125
        for queue in queues.QUEUES:
126
            # Queues are mirrored to all RabbitMQ brokers
127
            self.client.queue_declare(queue=queue, mirrored=True,
128
                                      dead_letter_exchange=exchange_dl)
129
            # Declare the corresponding dead-letter queue
130
            queue_dl = queues.convert_queue_to_dead(queue)
131
            self.client.queue_declare(queue=queue_dl, mirrored=True)
132

    
133
        # Bind queues to handler methods
134
        for binding in queues.BINDINGS:
135
            try:
136
                callback = getattr(callbacks, binding[3])
137
            except AttributeError:
138
                log.error("Cannot find callback %s", binding[3])
139
                raise SystemExit(1)
140
            queue = binding[0]
141
            exchange = binding[1]
142
            routing_key = binding[2]
143

    
144
            self.client.queue_bind(queue=queue, exchange=exchange,
145
                                   routing_key=routing_key)
146

    
147
            self.client.basic_consume(queue=binding[0],
148
                                      callback=callback,
149
                                      prefetch_count=5)
150

    
151
            queue_dl = queues.convert_queue_to_dead(queue)
152
            exchange_dl = queues.convert_exchange_to_dead(exchange)
153
            # Bind the corresponding dead-letter queue
154
            self.client.queue_bind(queue=queue_dl,
155
                                   exchange=exchange_dl,
156
                                   routing_key=routing_key)
157

    
158
            log.debug("Binding %s(%s) to queue %s with handler %s",
159
                      exchange, routing_key, queue, binding[3])
160

    
161

    
162
def parse_arguments(args):
163
    from optparse import OptionParser
164

    
165
    default_pid_file = \
166
        os.path.join(".", "var", "run", "synnefo", "dispatcher.pid")[1:]
167
    parser = OptionParser()
168
    parser.add_option("-d", "--debug", action="store_true", default=False,
169
                      dest="debug", help="Enable debug mode")
170
    parser.add_option("-w", "--workers", default=2, dest="workers",
171
                      help="Number of workers to spawn", type="int")
172
    parser.add_option("-p", "--pid-file", dest="pid_file",
173
                      default=default_pid_file,
174
                      help="Save PID to file (default: %s)" % default_pid_file)
175
    parser.add_option("--purge-queues", action="store_true",
176
                      default=False, dest="purge_queues",
177
                      help="Remove all declared queues (DANGEROUS!)")
178
    parser.add_option("--purge-exchanges", action="store_true",
179
                      default=False, dest="purge_exchanges",
180
                      help="Remove all exchanges. Implies deleting all queues \
181
                           first (DANGEROUS!)")
182
    parser.add_option("--drain-queue", dest="drain_queue",
183
                      help="Strips a queue from all outstanding messages")
184

    
185
    return parser.parse_args(args)
186

    
187

    
188
def purge_queues():
189
    """
190
        Delete declared queues from RabbitMQ. Use with care!
191
    """
192
    client = AMQPClient(max_retries=120)
193
    client.connect()
194

    
195
    print "Queues to be deleted: ", queues.QUEUES
196

    
197
    if not get_user_confirmation():
198
        return
199

    
200
    for queue in queues.QUEUES:
201
        result = client.queue_delete(queue=queue)
202
        print "Deleting queue %s. Result: %s" % (queue, result)
203

    
204
    client.close()
205

    
206

    
207
def purge_exchanges():
208
    """Delete declared exchanges from RabbitMQ, after removing all queues"""
209
    purge_queues()
210

    
211
    client = AMQPClient()
212
    client.connect()
213

    
214
    exchanges = queues.EXCHANGES
215
    print "Exchanges to be deleted: ", exchanges
216

    
217
    if not get_user_confirmation():
218
        return
219

    
220
    for exch in exchanges:
221
        result = client.exchange_delete(exchange=exch)
222
        print "Deleting exchange %s. Result: %s" % (exch, result)
223
    client.close()
224

    
225

    
226
def drain_queue(queue):
227
    """Strip a (declared) queue from all outstanding messages"""
228
    if not queue:
229
        return
230

    
231
    if not queue in queues.QUEUES:
232
        print "Queue %s not configured" % queue
233
        return
234

    
235
    print "Queue to be drained: %s" % queue
236

    
237
    if not get_user_confirmation():
238
        return
239

    
240
    client = AMQPClient()
241
    client.connect()
242

    
243
    tag = client.basic_consume(queue=queue, callback=callbacks.dummy_proc)
244

    
245
    print "Queue draining about to start, hit Ctrl+c when done"
246
    time.sleep(2)
247
    print "Queue draining starting"
248

    
249
    num_processed = 0
250
    while True:
251
        client.basic_wait()
252
        num_processed += 1
253
        sys.stderr.write("Ignored %d messages\r" % num_processed)
254

    
255
    client.basic_cancel(tag)
256
    client.close()
257

    
258

    
259
def get_user_confirmation():
260
    ans = raw_input("Are you sure (N/y):")
261

    
262
    if not ans:
263
        return False
264
    if ans not in ['Y', 'y']:
265
        return False
266
    return True
267

    
268

    
269
def debug_mode():
270
    disp = Dispatcher(debug=True)
271
    disp.wait()
272

    
273

    
274
def daemon_mode(opts):
275
    disp = Dispatcher(debug=False)
276
    disp.wait()
277

    
278

    
279
def setup_logging(opts):
280
    import logging
281
    formatter = logging.Formatter("%(asctime)s %(name)s %(module)s"
282
                                  " [%(levelname)s] %(message)s")
283
    if opts.debug:
284
        log_handler = logging.StreamHandler()
285
        log_handler.setFormatter(formatter)
286
    else:
287
        import logging.handlers
288
        log_file = "/var/log/synnefo/dispatcher.log"
289
        log_handler = logging.handlers.WatchedFileHandler(log_file)
290
        log_handler.setFormatter(formatter)
291

    
292
    for l in LOGGERS:
293
        l.addHandler(log_handler)
294
        l.setLevel(logging.DEBUG)
295

    
296

    
297
def main():
298
    (opts, args) = parse_arguments(sys.argv[1:])
299

    
300
    # Rename this process so 'ps' output looks like this is a native
301
    # executable.  Cannot seperate command-line arguments from actual name of
302
    # the executable by NUL bytes, so only show the name of the executable
303
    # instead.  setproctitle.setproctitle("\x00".join(sys.argv))
304
    setproctitle.setproctitle(sys.argv[0])
305
    setup_logging(opts)
306

    
307
    # Special case for the clean up queues action
308
    if opts.purge_queues:
309
        purge_queues()
310
        return
311

    
312
    # Special case for the clean up exch action
313
    if opts.purge_exchanges:
314
        purge_exchanges()
315
        return
316

    
317
    if opts.drain_queue:
318
        drain_queue(opts.drain_queue)
319
        return
320

    
321
    # Debug mode, process messages without daemonizing
322
    if opts.debug:
323
        debug_mode()
324
        return
325

    
326
    # Create pidfile,
327
    pidf = pidlockfile.TimeoutPIDLockFile(opts.pid_file, 10)
328

    
329
    if daemon.runner.is_pidfile_stale(pidf):
330
        log.warning("Removing stale PID lock file %s", pidf.path)
331
        pidf.break_lock()
332

    
333
    files_preserve = []
334
    for handler in log.handlers:
335
        stream = getattr(handler, 'stream')
336
        if stream and hasattr(stream, 'fileno'):
337
            files_preserve.append(handler.stream)
338

    
339
    stderr_stream = None
340
    for handler in log.handlers:
341
        stream = getattr(handler, 'stream')
342
        if stream and hasattr(handler, 'baseFilename'):
343
            stderr_stream = stream
344
            break
345

    
346
    daemon_context = daemon.DaemonContext(
347
        pidfile=pidf,
348
        umask=0022,
349
        stdout=stderr_stream,
350
        stderr=stderr_stream,
351
        files_preserve=files_preserve)
352

    
353
    try:
354
        daemon_context.open()
355
    except (pidlockfile.AlreadyLocked, LockTimeout):
356
        log.critical("Failed to lock pidfile %s, another instance running?",
357
                     pidf.path)
358
        sys.exit(1)
359

    
360
    log.info("Became a daemon")
361

    
362
    if 'gevent' in sys.modules:
363
        # A fork() has occured while daemonizing. If running in
364
        # gevent context we *must* reinit gevent
365
        log.debug("gevent imported. Reinitializing gevent")
366
        import gevent
367
        gevent.reinit()
368

    
369
    # Catch every exception, make sure it gets logged properly
370
    try:
371
        daemon_mode(opts)
372
    except Exception:
373
        log.exception("Unknown error")
374
        raise
375

    
376
if __name__ == "__main__":
377
    sys.exit(main())
378

    
379
# vim: set sta sts=4 shiftwidth=4 sw=4 et ai :