Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / dispatcher.py @ 2e1e6844

History | View | Annotate | Download (11.9 kB)

1
#!/usr/bin/env python
2
# Copyright 2011 GRNET S.A. All rights reserved.
3
#
4
# Redistribution and use in source and binary forms, with or without
5
# modification, are permitted provided that the following conditions
6
# are met:
7
#
8
#   1. Redistributions of source code must retain the above copyright
9
#      notice, this list of conditions and the following disclaimer.
10
#
11
#  2. Redistributions in binary form must reproduce the above copyright
12
#     notice, this list of conditions and the following disclaimer in the
13
#     documentation and/or other materials provided with the distribution.
14
#
15
# THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
16
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18
# ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
19
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
20
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
21
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
22
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
23
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
24
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
25
# SUCH DAMAGE.
26
#
27
# The views and conclusions contained in the software and documentation are
28
# those of the authors and should not be interpreted as representing official
29
# policies, either expressed or implied, of GRNET S.A.
30

    
31

    
32
""" Message queue setup, dispatch and admin
33

34
This program sets up connections to the queues configured in settings.py
35
and implements the message wait and dispatch loops. Actual messages are
36
handled in the dispatched functions.
37

38
"""
39
from django.core.management import setup_environ
40

    
41
# Fix path to import synnefo settings
42
import sys
43
import os
44
path = os.path.normpath(os.path.join(os.getcwd(), '..'))
45
sys.path.append(path)
46
from synnefo import settings
47
setup_environ(settings)
48

    
49
from django.db import close_connection
50

    
51
import time
52

    
53
import daemon
54
import daemon.runner
55
from lockfile import LockTimeout
56
# Take care of differences between python-daemon versions.
57
try:
58
    from daemon import pidfile as pidlockfile
59
except:
60
    from daemon import pidlockfile
61
import setproctitle
62

    
63
from synnefo.lib.amqp import AMQPClient
64
from synnefo.logic import callbacks
65
from synnefo.logic import queues
66

    
67
import logging
68

    
69
log = logging.getLogger("dispatcher")
70
log_amqp = logging.getLogger("amqp")
71
log_logic = logging.getLogger("synnefo.logic")
72

    
73
LOGGERS = [log, log_amqp, log_logic]
74

    
75

    
76
class Dispatcher:
77
    debug = False
78

    
79
    def __init__(self, debug=False):
80
        self.debug = debug
81
        self._init()
82

    
83
    def wait(self):
84
        log.info("Waiting for messages..")
85
        timeout = 600
86
        while True:
87
            try:
88
                # Close the Django DB connection before processing
89
                # every incoming message. This plays nicely with
90
                # DB connection pooling, if enabled and allows
91
                # the dispatcher to recover from broken connections
92
                # gracefully.
93
                close_connection()
94
                msg = self.client.basic_wait(timeout=timeout)
95
                if not msg:
96
                    log.warning("Idle connection for %d seconds. Will connect"
97
                                " to a different host. Verify that"
98
                                " snf-ganeti-eventd is running!!", timeout)
99
                    self.client.reconnect()
100
            except SystemExit:
101
                break
102
            except Exception as e:
103
                log.exception("Caught unexpected exception: %s", e)
104

    
105
        self.client.basic_cancel()
106
        self.client.close()
107

    
108
    def _init(self):
109
        log.info("Initializing")
110

    
111
        self.client = AMQPClient(logger=log_amqp)
112
        # Connect to AMQP host
113
        self.client.connect()
114

    
115
        # Declare queues and exchanges
116
        exchange = settings.EXCHANGE_GANETI
117
        exchange_dl = queues.convert_exchange_to_dead(exchange)
118
        self.client.exchange_declare(exchange=exchange,
119
                                     type="topic")
120
        self.client.exchange_declare(exchange=exchange_dl,
121
                                     type="topic")
122

    
123
        for queue in queues.QUEUES:
124
            # Queues are mirrored to all RabbitMQ brokers
125
            self.client.queue_declare(queue=queue, mirrored=True,
126
                                      dead_letter_exchange=exchange_dl)
127
            # Declare the corresponding dead-letter queue
128
            queue_dl = queues.convert_queue_to_dead(queue)
129
            self.client.queue_declare(queue=queue_dl, mirrored=True)
130

    
131
        # Bind queues to handler methods
132
        for binding in queues.BINDINGS:
133
            try:
134
                callback = getattr(callbacks, binding[3])
135
            except AttributeError:
136
                log.error("Cannot find callback %s", binding[3])
137
                raise SystemExit(1)
138
            queue = binding[0]
139
            exchange = binding[1]
140
            routing_key = binding[2]
141

    
142
            self.client.queue_bind(queue=queue, exchange=exchange,
143
                                   routing_key=routing_key)
144

    
145
            self.client.basic_consume(queue=binding[0],
146
                                      callback=callback,
147
                                      prefetch_count=5)
148

    
149
            queue_dl = queues.convert_queue_to_dead(queue)
150
            exchange_dl = queues.convert_exchange_to_dead(exchange)
151
            # Bind the corresponding dead-letter queue
152
            self.client.queue_bind(queue=queue_dl,
153
                                   exchange=exchange_dl,
154
                                   routing_key=routing_key)
155

    
156
            log.debug("Binding %s(%s) to queue %s with handler %s",
157
                      exchange, routing_key, queue, binding[3])
158

    
159

    
160
def parse_arguments(args):
161
    from optparse import OptionParser
162

    
163
    default_pid_file = \
164
        os.path.join(".", "var", "run", "synnefo", "dispatcher.pid")[1:]
165
    parser = OptionParser()
166
    parser.add_option("-d", "--debug", action="store_true", default=False,
167
                      dest="debug", help="Enable debug mode")
168
    parser.add_option("-w", "--workers", default=2, dest="workers",
169
                      help="Number of workers to spawn", type="int")
170
    parser.add_option("-p", "--pid-file", dest="pid_file",
171
                      default=default_pid_file,
172
                      help="Save PID to file (default: %s)" % default_pid_file)
173
    parser.add_option("--purge-queues", action="store_true",
174
                      default=False, dest="purge_queues",
175
                      help="Remove all declared queues (DANGEROUS!)")
176
    parser.add_option("--purge-exchanges", action="store_true",
177
                      default=False, dest="purge_exchanges",
178
                      help="Remove all exchanges. Implies deleting all queues \
179
                           first (DANGEROUS!)")
180
    parser.add_option("--drain-queue", dest="drain_queue",
181
                      help="Strips a queue from all outstanding messages")
182

    
183
    return parser.parse_args(args)
184

    
185

    
186
def purge_queues():
187
    """
188
        Delete declared queues from RabbitMQ. Use with care!
189
    """
190
    client = AMQPClient(max_retries=120)
191
    client.connect()
192

    
193
    print "Queues to be deleted: ", queues.QUEUES
194

    
195
    if not get_user_confirmation():
196
        return
197

    
198
    for queue in queues.QUEUES:
199
        result = client.queue_delete(queue=queue)
200
        print "Deleting queue %s. Result: %s" % (queue, result)
201

    
202
    client.close()
203

    
204

    
205
def purge_exchanges():
206
    """Delete declared exchanges from RabbitMQ, after removing all queues"""
207
    purge_queues()
208

    
209
    client = AMQPClient()
210
    client.connect()
211

    
212
    exchanges = queues.EXCHANGES
213
    print "Exchanges to be deleted: ", exchanges
214

    
215
    if not get_user_confirmation():
216
        return
217

    
218
    for exch in exchanges:
219
        result = client.exchange_delete(exchange=exch)
220
        print "Deleting exchange %s. Result: %s" % (exch, result)
221
    client.close()
222

    
223

    
224
def drain_queue(queue):
225
    """Strip a (declared) queue from all outstanding messages"""
226
    if not queue:
227
        return
228

    
229
    if not queue in queues.QUEUES:
230
        print "Queue %s not configured" % queue
231
        return
232

    
233
    print "Queue to be drained: %s" % queue
234

    
235
    if not get_user_confirmation():
236
        return
237

    
238
    client = AMQPClient()
239
    client.connect()
240

    
241
    tag = client.basic_consume(queue=queue, callback=callbacks.dummy_proc)
242

    
243
    print "Queue draining about to start, hit Ctrl+c when done"
244
    time.sleep(2)
245
    print "Queue draining starting"
246

    
247
    num_processed = 0
248
    while True:
249
        client.basic_wait()
250
        num_processed += 1
251
        sys.stderr.write("Ignored %d messages\r" % num_processed)
252

    
253
    client.basic_cancel(tag)
254
    client.close()
255

    
256

    
257
def get_user_confirmation():
258
    ans = raw_input("Are you sure (N/y):")
259

    
260
    if not ans:
261
        return False
262
    if ans not in ['Y', 'y']:
263
        return False
264
    return True
265

    
266

    
267
def debug_mode():
268
    disp = Dispatcher(debug=True)
269
    disp.wait()
270

    
271

    
272
def daemon_mode(opts):
273
    disp = Dispatcher(debug=False)
274
    disp.wait()
275

    
276

    
277
def setup_logging(opts):
278
    import logging
279
    formatter = logging.Formatter("%(asctime)s %(name)s %(module)s [%(levelname)s] %(message)s")
280
    if opts.debug:
281
        log_handler = logging.StreamHandler()
282
        log_handler.setFormatter(formatter)
283
    else:
284
        import logging.handlers
285
        log_file = "/var/log/synnefo/dispatcher.log"
286
        log_handler = logging.handlers.WatchedFileHandler(log_file)
287
        log_handler.setFormatter(formatter)
288

    
289
    for l in LOGGERS:
290
        l.addHandler(log_handler)
291
        l.setLevel(logging.DEBUG)
292

    
293

    
294
def main():
295
    (opts, args) = parse_arguments(sys.argv[1:])
296

    
297
    # Rename this process so 'ps' output looks like this is a native
298
    # executable.  Can not seperate command-line arguments from actual name of
299
    # the executable by NUL bytes, so only show the name of the executable
300
    # instead.  setproctitle.setproctitle("\x00".join(sys.argv))
301
    setproctitle.setproctitle(sys.argv[0])
302
    setup_logging(opts)
303

    
304
    # Special case for the clean up queues action
305
    if opts.purge_queues:
306
        purge_queues()
307
        return
308

    
309
    # Special case for the clean up exch action
310
    if opts.purge_exchanges:
311
        purge_exchanges()
312
        return
313

    
314
    if opts.drain_queue:
315
        drain_queue(opts.drain_queue)
316
        return
317

    
318
    # Debug mode, process messages without daemonizing
319
    if opts.debug:
320
        debug_mode()
321
        return
322

    
323
    # Create pidfile,
324
    pidf = pidlockfile.TimeoutPIDLockFile(opts.pid_file, 10)
325

    
326
    if daemon.runner.is_pidfile_stale(pidf):
327
        log.warning("Removing stale PID lock file %s", pidf.path)
328
        pidf.break_lock()
329

    
330
    files_preserve = []
331
    for handler in log.handlers:
332
        stream = getattr(handler, 'stream')
333
        if stream and hasattr(stream, 'fileno'):
334
            files_preserve.append(handler.stream)
335

    
336
    stderr_stream = None
337
    for handler in log.handlers:
338
        stream = getattr(handler, 'stream')
339
        if stream and hasattr(handler, 'baseFilename'):
340
            stderr_stream = stream
341
            break
342

    
343
    daemon_context = daemon.DaemonContext(
344
        pidfile=pidf,
345
        umask=0022,
346
        stdout=stderr_stream,
347
        stderr=stderr_stream,
348
        files_preserve=files_preserve)
349

    
350
    try:
351
        daemon_context.open()
352
    except (pidlockfile.AlreadyLocked, LockTimeout):
353
        log.critical("Failed to lock pidfile %s, another instance running?",
354
                     pidf.path)
355
        sys.exit(1)
356

    
357
    log.info("Became a daemon")
358

    
359
    if 'gevent' in sys.modules:
360
        # A fork() has occured while daemonizing. If running in
361
        # gevent context we *must* reinit gevent
362
        log.debug("gevent imported. Reinitializing gevent")
363
        import gevent
364
        gevent.reinit()
365

    
366
    # Catch every exception, make sure it gets logged properly
367
    try:
368
        daemon_mode(opts)
369
    except Exception:
370
        log.exception("Unknown error")
371
        raise
372

    
373
if __name__ == "__main__":
374
    sys.exit(main())
375

    
376
# vim: set sta sts=4 shiftwidth=4 sw=4 et ai :