Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / dispatcher.py @ ece5581b

History | View | Annotate | Download (11.9 kB)

1
#!/usr/bin/env python
2
# Copyright 2011 GRNET S.A. All rights reserved.
3
#
4
# Redistribution and use in source and binary forms, with or without
5
# modification, are permitted provided that the following conditions
6
# are met:
7
#
8
#   1. Redistributions of source code must retain the above copyright
9
#      notice, this list of conditions and the following disclaimer.
10
#
11
#  2. Redistributions in binary form must reproduce the above copyright
12
#     notice, this list of conditions and the following disclaimer in the
13
#     documentation and/or other materials provided with the distribution.
14
#
15
# THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
16
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18
# ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
19
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
20
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
21
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
22
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
23
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
24
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
25
# SUCH DAMAGE.
26
#
27
# The views and conclusions contained in the software and documentation are
28
# those of the authors and should not be interpreted as representing official
29
# policies, either expressed or implied, of GRNET S.A.
30

    
31

    
32
""" Message queue setup, dispatch and admin
33

34
This program sets up connections to the queues configured in settings.py
35
and implements the message wait and dispatch loops. Actual messages are
36
handled in the dispatched functions.
37

38
"""
39
from django.core.management import setup_environ
40

    
41
# Fix path to import synnefo settings
42
import sys
43
import os
44
path = os.path.normpath(os.path.join(os.getcwd(), '..'))
45
sys.path.append(path)
46
from synnefo import settings
47
setup_environ(settings)
48

    
49
from django.db import close_connection
50

    
51
import time
52

    
53
import daemon
54
import daemon.runner
55
from lockfile import LockTimeout
56
# Take care of differences between python-daemon versions.
57
try:
58
    from daemon import pidfile as pidlockfile
59
except:
60
    from daemon import pidlockfile
61
import setproctitle
62

    
63
from synnefo.lib.amqp import AMQPClient
64
from synnefo.logic import callbacks
65
from synnefo.logic import queues
66

    
67
import logging
68

    
69
log = logging.getLogger("dispatcher")
70
log_amqp = logging.getLogger("amqp")
71
log_logic = logging.getLogger("synnefo.logic")
72

    
73
LOGGERS = [log, log_amqp, log_logic]
74

    
75

    
76
class Dispatcher:
77
    debug = False
78

    
79
    def __init__(self, debug=False):
80
        self.debug = debug
81
        self._init()
82

    
83
    def wait(self):
84
        log.info("Waiting for messages..")
85
        timeout = 600
86
        while True:
87
            try:
88
                # Close the Django DB connection before processing
89
                # every incoming message. This plays nicely with
90
                # DB connection pooling, if enabled and allows
91
                # the dispatcher to recover from broken connections
92
                # gracefully.
93
                close_connection()
94
                msg = self.client.basic_wait(timeout=timeout)
95
                if not msg:
96
                    log.warning("Idle connection for %d seconds. Will connect"
97
                                " to a different host. Verify that"
98
                                " snf-ganeti-eventd is running!!", timeout)
99
                    self.client.reconnect()
100
            except SystemExit:
101
                break
102
            except Exception as e:
103
                log.exception("Caught unexpected exception: %s", e)
104

    
105
        self.client.basic_cancel()
106
        self.client.close()
107

    
108
    def _init(self):
109
        log.info("Initializing")
110

    
111
        self.client = AMQPClient(logger=log_amqp)
112
        # Connect to AMQP host
113
        self.client.connect()
114

    
115
        # Declare queues and exchanges
116
        exchange = settings.EXCHANGE_GANETI
117
        exchange_dl = queues.convert_exchange_to_dead(exchange)
118
        self.client.exchange_declare(exchange=exchange,
119
                                     type="topic")
120
        self.client.exchange_declare(exchange=exchange_dl,
121
                                     type="topic")
122

    
123
        for queue in queues.QUEUES:
124
            # Queues are mirrored to all RabbitMQ brokers
125
            self.client.queue_declare(queue=queue, mirrored=True,
126
                                      dead_letter_exchange=exchange_dl)
127
            # Declare the corresponding dead-letter queue
128
            queue_dl = queues.convert_queue_to_dead(queue)
129
            self.client.queue_declare(queue=queue_dl, mirrored=True)
130

    
131
        # Bind queues to handler methods
132
        for binding in queues.BINDINGS:
133
            try:
134
                callback = getattr(callbacks, binding[3])
135
            except AttributeError:
136
                log.error("Cannot find callback %s", binding[3])
137
                raise SystemExit(1)
138
            queue = binding[0]
139
            exchange = binding[1]
140
            routing_key = binding[2]
141

    
142
            self.client.queue_bind(queue=queue, exchange=exchange,
143
                                   routing_key=routing_key)
144

    
145
            self.client.basic_consume(queue=binding[0],
146
                                      callback=callback,
147
                                      prefetch_count=5)
148

    
149
            queue_dl = queues.convert_queue_to_dead(queue)
150
            exchange_dl = queues.convert_exchange_to_dead(exchange)
151
            # Bind the corresponding dead-letter queue
152
            self.client.queue_bind(queue=queue_dl,
153
                                   exchange=exchange_dl,
154
                                   routing_key=routing_key)
155

    
156
            log.debug("Binding %s(%s) to queue %s with handler %s",
157
                      exchange, routing_key, queue, binding[3])
158

    
159

    
160
def parse_arguments(args):
161
    from optparse import OptionParser
162

    
163
    default_pid_file = \
164
        os.path.join(".", "var", "run", "synnefo", "dispatcher.pid")[1:]
165
    parser = OptionParser()
166
    parser.add_option("-d", "--debug", action="store_true", default=False,
167
                      dest="debug", help="Enable debug mode")
168
    parser.add_option("-w", "--workers", default=2, dest="workers",
169
                      help="Number of workers to spawn", type="int")
170
    parser.add_option("-p", "--pid-file", dest="pid_file",
171
                      default=default_pid_file,
172
                      help="Save PID to file (default: %s)" % default_pid_file)
173
    parser.add_option("--purge-queues", action="store_true",
174
                      default=False, dest="purge_queues",
175
                      help="Remove all declared queues (DANGEROUS!)")
176
    parser.add_option("--purge-exchanges", action="store_true",
177
                      default=False, dest="purge_exchanges",
178
                      help="Remove all exchanges. Implies deleting all queues \
179
                           first (DANGEROUS!)")
180
    parser.add_option("--drain-queue", dest="drain_queue",
181
                      help="Strips a queue from all outstanding messages")
182

    
183
    return parser.parse_args(args)
184

    
185

    
186
def purge_queues():
187
    """
188
        Delete declared queues from RabbitMQ. Use with care!
189
    """
190
    client = AMQPClient(max_retries=120)
191
    client.connect()
192

    
193
    print "Queues to be deleted: ", queues.QUEUES
194

    
195
    if not get_user_confirmation():
196
        return
197

    
198
    for queue in queues.QUEUES:
199
        result = client.queue_delete(queue=queue)
200
        print "Deleting queue %s. Result: %s" % (queue, result)
201

    
202
    client.close()
203

    
204

    
205
def purge_exchanges():
206
    """Delete declared exchanges from RabbitMQ, after removing all queues"""
207
    purge_queues()
208

    
209
    client = AMQPClient()
210
    client.connect()
211

    
212
    exchanges = queues.EXCHANGES
213
    print "Exchanges to be deleted: ", exchanges
214

    
215
    if not get_user_confirmation():
216
        return
217

    
218
    for exch in exchanges:
219
        result = client.exchange_delete(exchange=exch)
220
        print "Deleting exchange %s. Result: %s" % (exch, result)
221
    client.close()
222

    
223

    
224
def drain_queue(queue):
225
    """Strip a (declared) queue from all outstanding messages"""
226
    if not queue:
227
        return
228

    
229
    if not queue in queues.QUEUES:
230
        print "Queue %s not configured" % queue
231
        return
232

    
233
    print "Queue to be drained: %s" % queue
234

    
235
    if not get_user_confirmation():
236
        return
237

    
238
    client = AMQPClient()
239
    client.connect()
240

    
241
    tag = client.basic_consume(queue=queue, callback=callbacks.dummy_proc)
242

    
243
    print "Queue draining about to start, hit Ctrl+c when done"
244
    time.sleep(2)
245
    print "Queue draining starting"
246

    
247
    num_processed = 0
248
    while True:
249
        client.basic_wait()
250
        num_processed += 1
251
        sys.stderr.write("Ignored %d messages\r" % num_processed)
252

    
253
    client.basic_cancel(tag)
254
    client.close()
255

    
256

    
257
def get_user_confirmation():
258
    ans = raw_input("Are you sure (N/y):")
259

    
260
    if not ans:
261
        return False
262
    if ans not in ['Y', 'y']:
263
        return False
264
    return True
265

    
266

    
267
def debug_mode():
268
    disp = Dispatcher(debug=True)
269
    disp.wait()
270

    
271

    
272
def daemon_mode(opts):
273
    disp = Dispatcher(debug=False)
274
    disp.wait()
275

    
276

    
277
def setup_logging(opts):
278
    import logging
279
    formatter = logging.Formatter("%(asctime)s %(name)s %(module)s"
280
                                  " [%(levelname)s] %(message)s")
281
    if opts.debug:
282
        log_handler = logging.StreamHandler()
283
        log_handler.setFormatter(formatter)
284
    else:
285
        import logging.handlers
286
        log_file = "/var/log/synnefo/dispatcher.log"
287
        log_handler = logging.handlers.WatchedFileHandler(log_file)
288
        log_handler.setFormatter(formatter)
289

    
290
    for l in LOGGERS:
291
        l.addHandler(log_handler)
292
        l.setLevel(logging.DEBUG)
293

    
294

    
295
def main():
296
    (opts, args) = parse_arguments(sys.argv[1:])
297

    
298
    # Rename this process so 'ps' output looks like this is a native
299
    # executable.  Can not seperate command-line arguments from actual name of
300
    # the executable by NUL bytes, so only show the name of the executable
301
    # instead.  setproctitle.setproctitle("\x00".join(sys.argv))
302
    setproctitle.setproctitle(sys.argv[0])
303
    setup_logging(opts)
304

    
305
    # Special case for the clean up queues action
306
    if opts.purge_queues:
307
        purge_queues()
308
        return
309

    
310
    # Special case for the clean up exch action
311
    if opts.purge_exchanges:
312
        purge_exchanges()
313
        return
314

    
315
    if opts.drain_queue:
316
        drain_queue(opts.drain_queue)
317
        return
318

    
319
    # Debug mode, process messages without daemonizing
320
    if opts.debug:
321
        debug_mode()
322
        return
323

    
324
    # Create pidfile,
325
    pidf = pidlockfile.TimeoutPIDLockFile(opts.pid_file, 10)
326

    
327
    if daemon.runner.is_pidfile_stale(pidf):
328
        log.warning("Removing stale PID lock file %s", pidf.path)
329
        pidf.break_lock()
330

    
331
    files_preserve = []
332
    for handler in log.handlers:
333
        stream = getattr(handler, 'stream')
334
        if stream and hasattr(stream, 'fileno'):
335
            files_preserve.append(handler.stream)
336

    
337
    stderr_stream = None
338
    for handler in log.handlers:
339
        stream = getattr(handler, 'stream')
340
        if stream and hasattr(handler, 'baseFilename'):
341
            stderr_stream = stream
342
            break
343

    
344
    daemon_context = daemon.DaemonContext(
345
        pidfile=pidf,
346
        umask=0022,
347
        stdout=stderr_stream,
348
        stderr=stderr_stream,
349
        files_preserve=files_preserve)
350

    
351
    try:
352
        daemon_context.open()
353
    except (pidlockfile.AlreadyLocked, LockTimeout):
354
        log.critical("Failed to lock pidfile %s, another instance running?",
355
                     pidf.path)
356
        sys.exit(1)
357

    
358
    log.info("Became a daemon")
359

    
360
    if 'gevent' in sys.modules:
361
        # A fork() has occured while daemonizing. If running in
362
        # gevent context we *must* reinit gevent
363
        log.debug("gevent imported. Reinitializing gevent")
364
        import gevent
365
        gevent.reinit()
366

    
367
    # Catch every exception, make sure it gets logged properly
368
    try:
369
        daemon_mode(opts)
370
    except Exception:
371
        log.exception("Unknown error")
372
        raise
373

    
374
if __name__ == "__main__":
375
    sys.exit(main())
376

    
377
# vim: set sta sts=4 shiftwidth=4 sw=4 et ai :