Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / dispatcher.py @ d222936b

History | View | Annotate | Download (12.1 kB)

1
#!/usr/bin/env python
2
# Copyright 2011 GRNET S.A. All rights reserved.
3
#
4
# Redistribution and use in source and binary forms, with or without
5
# modification, are permitted provided that the following conditions
6
# are met:
7
#
8
#   1. Redistributions of source code must retain the above copyright
9
#      notice, this list of conditions and the following disclaimer.
10
#
11
#  2. Redistributions in binary form must reproduce the above copyright
12
#     notice, this list of conditions and the following disclaimer in the
13
#     documentation and/or other materials provided with the distribution.
14
#
15
# THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
16
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18
# ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
19
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
20
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
21
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
22
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
23
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
24
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
25
# SUCH DAMAGE.
26
#
27
# The views and conclusions contained in the software and documentation are
28
# those of the authors and should not be interpreted as representing official
29
# policies, either expressed or implied, of GRNET S.A.
30

    
31

    
32
""" Message queue setup, dispatch and admin
33

34
This program sets up connections to the queues configured in settings.py
35
and implements the message wait and dispatch loops. Actual messages are
36
handled in the dispatched functions.
37

38
"""
39

    
40
# Fix path to import synnefo settings
41
import sys
42
import os
43
path = os.path.normpath(os.path.join(os.getcwd(), '..'))
44
sys.path.append(path)
45

    
46
os.environ['DJANGO_SETTINGS_MODULE'] = 'synnefo.settings'
47
from django.conf import settings
48

    
49
from django.db import close_connection
50

    
51
import time
52

    
53
import daemon
54
import daemon.runner
55
from lockfile import LockTimeout
56
# Take care of differences between python-daemon versions.
57
try:
58
    from daemon import pidfile as pidlockfile
59
except:
60
    from daemon import pidlockfile
61
import setproctitle
62

    
63
from synnefo.lib.amqp import AMQPClient
64
from synnefo.logic import callbacks
65
from synnefo.logic import queues
66

    
67
import logging
68
import select
69
import errno
70

    
71
log = logging.getLogger("dispatcher")
72
log_amqp = logging.getLogger("amqp")
73
log_logic = logging.getLogger("synnefo.logic")
74

    
75
LOGGERS = [log, log_amqp, log_logic]
76

    
77

    
78
class Dispatcher:
79
    debug = False
80

    
81
    def __init__(self, debug=False):
82
        self.debug = debug
83
        self._init()
84

    
85
    def wait(self):
86
        log.info("Waiting for messages..")
87
        timeout = 600
88
        while True:
89
            try:
90
                # Close the Django DB connection before processing
91
                # every incoming message. This plays nicely with
92
                # DB connection pooling, if enabled and allows
93
                # the dispatcher to recover from broken connections
94
                # gracefully.
95
                close_connection()
96
                msg = self.client.basic_wait(timeout=timeout)
97
                if not msg:
98
                    log.warning("Idle connection for %d seconds. Will connect"
99
                                " to a different host. Verify that"
100
                                " snf-ganeti-eventd is running!!", timeout)
101
                    self.client.reconnect()
102
            except select.error as e:
103
                if e[0] != errno.EINTR:
104
                    log.exception("Caught unexpected exception: %s", e)
105
                else:
106
                    break
107
            except (SystemExit, KeyboardInterrupt):
108
                break
109
            except Exception as e:
110
                log.exception("Caught unexpected exception: %s", e)
111

    
112
        self.client.basic_cancel()
113
        self.client.close()
114

    
115
    def _init(self):
116
        log.info("Initializing")
117

    
118
        self.client = AMQPClient(logger=log_amqp)
119
        # Connect to AMQP host
120
        self.client.connect()
121

    
122
        # Declare queues and exchanges
123
        exchange = settings.EXCHANGE_GANETI
124
        exchange_dl = queues.convert_exchange_to_dead(exchange)
125
        self.client.exchange_declare(exchange=exchange,
126
                                     type="topic")
127
        self.client.exchange_declare(exchange=exchange_dl,
128
                                     type="topic")
129

    
130
        for queue in queues.QUEUES:
131
            # Queues are mirrored to all RabbitMQ brokers
132
            self.client.queue_declare(queue=queue, mirrored=True,
133
                                      dead_letter_exchange=exchange_dl)
134
            # Declare the corresponding dead-letter queue
135
            queue_dl = queues.convert_queue_to_dead(queue)
136
            self.client.queue_declare(queue=queue_dl, mirrored=True)
137

    
138
        # Bind queues to handler methods
139
        for binding in queues.BINDINGS:
140
            try:
141
                callback = getattr(callbacks, binding[3])
142
            except AttributeError:
143
                log.error("Cannot find callback %s", binding[3])
144
                raise SystemExit(1)
145
            queue = binding[0]
146
            exchange = binding[1]
147
            routing_key = binding[2]
148

    
149
            self.client.queue_bind(queue=queue, exchange=exchange,
150
                                   routing_key=routing_key)
151

    
152
            self.client.basic_consume(queue=binding[0],
153
                                      callback=callback,
154
                                      prefetch_count=5)
155

    
156
            queue_dl = queues.convert_queue_to_dead(queue)
157
            exchange_dl = queues.convert_exchange_to_dead(exchange)
158
            # Bind the corresponding dead-letter queue
159
            self.client.queue_bind(queue=queue_dl,
160
                                   exchange=exchange_dl,
161
                                   routing_key=routing_key)
162

    
163
            log.debug("Binding %s(%s) to queue %s with handler %s",
164
                      exchange, routing_key, queue, binding[3])
165

    
166

    
167
def parse_arguments(args):
168
    from optparse import OptionParser
169

    
170
    default_pid_file = \
171
        os.path.join(".", "var", "run", "synnefo", "dispatcher.pid")[1:]
172
    parser = OptionParser()
173
    parser.add_option("-d", "--debug", action="store_true", default=False,
174
                      dest="debug", help="Enable debug mode")
175
    parser.add_option("-w", "--workers", default=2, dest="workers",
176
                      help="Number of workers to spawn", type="int")
177
    parser.add_option("-p", "--pid-file", dest="pid_file",
178
                      default=default_pid_file,
179
                      help="Save PID to file (default: %s)" % default_pid_file)
180
    parser.add_option("--purge-queues", action="store_true",
181
                      default=False, dest="purge_queues",
182
                      help="Remove all declared queues (DANGEROUS!)")
183
    parser.add_option("--purge-exchanges", action="store_true",
184
                      default=False, dest="purge_exchanges",
185
                      help="Remove all exchanges. Implies deleting all queues \
186
                           first (DANGEROUS!)")
187
    parser.add_option("--drain-queue", dest="drain_queue",
188
                      help="Strips a queue from all outstanding messages")
189

    
190
    return parser.parse_args(args)
191

    
192

    
193
def purge_queues():
194
    """
195
        Delete declared queues from RabbitMQ. Use with care!
196
    """
197
    client = AMQPClient(max_retries=120)
198
    client.connect()
199

    
200
    print "Queues to be deleted: ", queues.QUEUES
201

    
202
    if not get_user_confirmation():
203
        return
204

    
205
    for queue in queues.QUEUES:
206
        result = client.queue_delete(queue=queue)
207
        print "Deleting queue %s. Result: %s" % (queue, result)
208

    
209
    client.close()
210

    
211

    
212
def purge_exchanges():
213
    """Delete declared exchanges from RabbitMQ, after removing all queues"""
214
    purge_queues()
215

    
216
    client = AMQPClient()
217
    client.connect()
218

    
219
    exchanges = queues.EXCHANGES
220
    print "Exchanges to be deleted: ", exchanges
221

    
222
    if not get_user_confirmation():
223
        return
224

    
225
    for exch in exchanges:
226
        result = client.exchange_delete(exchange=exch)
227
        print "Deleting exchange %s. Result: %s" % (exch, result)
228
    client.close()
229

    
230

    
231
def drain_queue(queue):
232
    """Strip a (declared) queue from all outstanding messages"""
233
    if not queue:
234
        return
235

    
236
    if not queue in queues.QUEUES:
237
        print "Queue %s not configured" % queue
238
        return
239

    
240
    print "Queue to be drained: %s" % queue
241

    
242
    if not get_user_confirmation():
243
        return
244

    
245
    client = AMQPClient()
246
    client.connect()
247

    
248
    tag = client.basic_consume(queue=queue, callback=callbacks.dummy_proc)
249

    
250
    print "Queue draining about to start, hit Ctrl+c when done"
251
    time.sleep(2)
252
    print "Queue draining starting"
253

    
254
    num_processed = 0
255
    while True:
256
        client.basic_wait()
257
        num_processed += 1
258
        sys.stderr.write("Ignored %d messages\r" % num_processed)
259

    
260
    client.basic_cancel(tag)
261
    client.close()
262

    
263

    
264
def get_user_confirmation():
265
    ans = raw_input("Are you sure (N/y):")
266

    
267
    if not ans:
268
        return False
269
    if ans not in ['Y', 'y']:
270
        return False
271
    return True
272

    
273

    
274
def debug_mode():
275
    disp = Dispatcher(debug=True)
276
    disp.wait()
277

    
278

    
279
def daemon_mode(opts):
280
    disp = Dispatcher(debug=False)
281
    disp.wait()
282

    
283

    
284
def setup_logging(opts):
285
    import logging
286
    formatter = logging.Formatter("%(asctime)s %(name)s %(module)s"
287
                                  " [%(levelname)s] %(message)s")
288
    if opts.debug:
289
        log_handler = logging.StreamHandler()
290
        log_handler.setFormatter(formatter)
291
    else:
292
        import logging.handlers
293
        log_file = "/var/log/synnefo/dispatcher.log"
294
        log_handler = logging.handlers.WatchedFileHandler(log_file)
295
        log_handler.setFormatter(formatter)
296

    
297
    for l in LOGGERS:
298
        l.addHandler(log_handler)
299
        l.setLevel(logging.DEBUG)
300

    
301

    
302
def main():
303
    (opts, args) = parse_arguments(sys.argv[1:])
304

    
305
    # Rename this process so 'ps' output looks like this is a native
306
    # executable.  Cannot seperate command-line arguments from actual name of
307
    # the executable by NUL bytes, so only show the name of the executable
308
    # instead.  setproctitle.setproctitle("\x00".join(sys.argv))
309
    setproctitle.setproctitle(sys.argv[0])
310
    setup_logging(opts)
311

    
312
    # Special case for the clean up queues action
313
    if opts.purge_queues:
314
        purge_queues()
315
        return
316

    
317
    # Special case for the clean up exch action
318
    if opts.purge_exchanges:
319
        purge_exchanges()
320
        return
321

    
322
    if opts.drain_queue:
323
        drain_queue(opts.drain_queue)
324
        return
325

    
326
    # Debug mode, process messages without daemonizing
327
    if opts.debug:
328
        debug_mode()
329
        return
330

    
331
    # Create pidfile,
332
    pidf = pidlockfile.TimeoutPIDLockFile(opts.pid_file, 10)
333

    
334
    if daemon.runner.is_pidfile_stale(pidf):
335
        log.warning("Removing stale PID lock file %s", pidf.path)
336
        pidf.break_lock()
337

    
338
    files_preserve = []
339
    for handler in log.handlers:
340
        stream = getattr(handler, 'stream')
341
        if stream and hasattr(stream, 'fileno'):
342
            files_preserve.append(handler.stream)
343

    
344
    stderr_stream = None
345
    for handler in log.handlers:
346
        stream = getattr(handler, 'stream')
347
        if stream and hasattr(handler, 'baseFilename'):
348
            stderr_stream = stream
349
            break
350

    
351
    daemon_context = daemon.DaemonContext(
352
        pidfile=pidf,
353
        umask=0022,
354
        stdout=stderr_stream,
355
        stderr=stderr_stream,
356
        files_preserve=files_preserve)
357

    
358
    try:
359
        daemon_context.open()
360
    except (pidlockfile.AlreadyLocked, LockTimeout):
361
        log.critical("Failed to lock pidfile %s, another instance running?",
362
                     pidf.path)
363
        sys.exit(1)
364

    
365
    log.info("Became a daemon")
366

    
367
    if 'gevent' in sys.modules:
368
        # A fork() has occured while daemonizing. If running in
369
        # gevent context we *must* reinit gevent
370
        log.debug("gevent imported. Reinitializing gevent")
371
        import gevent
372
        gevent.reinit()
373

    
374
    # Catch every exception, make sure it gets logged properly
375
    try:
376
        daemon_mode(opts)
377
    except Exception:
378
        log.exception("Unknown error")
379
        raise
380

    
381
if __name__ == "__main__":
382
    sys.exit(main())
383

    
384
# vim: set sta sts=4 shiftwidth=4 sw=4 et ai :