Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / dispatcher.py @ 2ef10562

History | View | Annotate | Download (12.6 kB)

1
#!/usr/bin/env python
2
# Copyright 2011 GRNET S.A. All rights reserved.
3
#
4
# Redistribution and use in source and binary forms, with or without
5
# modification, are permitted provided that the following conditions
6
# are met:
7
#
8
#   1. Redistributions of source code must retain the above copyright
9
#      notice, this list of conditions and the following disclaimer.
10
#
11
#  2. Redistributions in binary form must reproduce the above copyright
12
#     notice, this list of conditions and the following disclaimer in the
13
#     documentation and/or other materials provided with the distribution.
14
#
15
# THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
16
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18
# ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
19
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
20
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
21
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
22
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
23
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
24
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
25
# SUCH DAMAGE.
26
#
27
# The views and conclusions contained in the software and documentation are
28
# those of the authors and should not be interpreted as representing official
29
# policies, either expressed or implied, of GRNET S.A.
30

    
31

    
32
""" Message queue setup, dispatch and admin
33

34
This program sets up connections to the queues configured in settings.py
35
and implements the message wait and dispatch loops. Actual messages are
36
handled in the dispatched functions.
37

38
"""
39
from django.core.management import setup_environ
40

    
41
# Fix path to import synnefo settings
42
import sys
43
import os
44
path = os.path.normpath(os.path.join(os.getcwd(), '..'))
45
sys.path.append(path)
46
from synnefo import settings
47
setup_environ(settings)
48

    
49
from django.db import close_connection
50

    
51
import logging
52
import time
53

    
54
import daemon
55
import daemon.runner
56
from lockfile import LockTimeout
57
# Take care of differences between python-daemon versions.
58
try:
59
    from daemon import pidfile as pidlockfile
60
except:
61
    from daemon import pidlockfile
62
import setproctitle
63

    
64
from synnefo.lib.amqp import AMQPClient
65
from synnefo.logic import callbacks
66

    
67
from synnefo.util.dictconfig import dictConfig
68
dictConfig(settings.DISPATCHER_LOGGING)
69
log = logging.getLogger()
70

    
71
# Queue names
72
QUEUES = []
73

    
74
# Queue bindings to exchanges
75
BINDINGS = []
76

    
77

    
78
class Dispatcher:
79
    debug = False
80

    
81
    def __init__(self, debug=False):
82
        self.debug = debug
83
        self._init()
84

    
85
    def wait(self):
86
        log.info("Waiting for messages..")
87
        while True:
88
            try:
89
                # Close the Django DB connection before processing
90
                # every incoming message. This plays nicely with
91
                # DB connection pooling, if enabled and allows
92
                # the dispatcher to recover from broken connections
93
                # gracefully.
94
                close_connection()
95
                self.client.basic_wait()
96
            except SystemExit:
97
                break
98
            except Exception as e:
99
                log.exception("Caught unexpected exception: %s", e)
100

    
101
        self.client.basic_cancel()
102
        self.client.close()
103

    
104
    def _init(self):
105
        global QUEUES, BINDINGS
106
        log.info("Initializing")
107

    
108
        self.client = AMQPClient()
109
        # Connect to AMQP host
110
        self.client.connect()
111

    
112
        # Declare queues and exchanges
113
        for exchange in settings.EXCHANGES:
114
            self.client.exchange_declare(exchange=exchange,
115
                                         type="topic")
116

    
117
        for queue in QUEUES:
118
            # Queues are mirrored to all RabbitMQ brokers
119
            self.client.queue_declare(queue=queue, mirrored=True)
120

    
121
        bindings = BINDINGS
122

    
123
        # Bind queues to handler methods
124
        for binding in bindings:
125
            try:
126
                callback = getattr(callbacks, binding[3])
127
            except AttributeError:
128
                log.error("Cannot find callback %s", binding[3])
129
                raise SystemExit(1)
130

    
131
            self.client.queue_bind(queue=binding[0], exchange=binding[1],
132
                                   routing_key=binding[2])
133

    
134
            self.client.basic_consume(queue=binding[0],
135
                                                        callback=callback)
136

    
137
            log.debug("Binding %s(%s) to queue %s with handler %s",
138
                      binding[1], binding[2], binding[0], binding[3])
139

    
140

    
141
def _init_queues():
142
    global QUEUES, BINDINGS
143

    
144
    # Queue declarations
145
    prefix = settings.BACKEND_PREFIX_ID.split('-')[0]
146

    
147
    QUEUE_GANETI_EVENTS_OP = "%s-events-op" % prefix
148
    QUEUE_GANETI_EVENTS_NETWORK = "%s-events-network" % prefix
149
    QUEUE_GANETI_EVENTS_NET = "%s-events-net" % prefix
150
    QUEUE_GANETI_BUILD_PROGR = "%s-events-progress" % prefix
151
    QUEUE_RECONC = "%s-reconciliation" % prefix
152
    if settings.DEBUG is True:
153
        QUEUE_DEBUG = "%s-debug" % prefix  # Debug queue, retrieves all messages
154

    
155
    QUEUES = (QUEUE_GANETI_EVENTS_OP, QUEUE_GANETI_EVENTS_NETWORK, QUEUE_GANETI_EVENTS_NET, QUEUE_RECONC,
156
              QUEUE_GANETI_BUILD_PROGR)
157

    
158
    # notifications of type "ganeti-op-status"
159
    DB_HANDLER_KEY_OP = 'ganeti.%s.event.op' % prefix
160
    # notifications of type "ganeti-network-status"
161
    DB_HANDLER_KEY_NETWORK = 'ganeti.%s.event.network' % prefix
162
    # notifications of type "ganeti-net-status"
163
    DB_HANDLER_KEY_NET = 'ganeti.%s.event.net' % prefix
164
    # notifications of type "ganeti-create-progress"
165
    BUILD_MONITOR_HANDLER = 'ganeti.%s.event.progress' % prefix
166
    # reconciliation
167
    RECONC_HANDLER = 'reconciliation.%s.*' % prefix
168

    
169
    BINDINGS = [
170
    # Queue                   # Exchange                # RouteKey              # Handler
171
    (QUEUE_GANETI_EVENTS_OP,  settings.EXCHANGE_GANETI, DB_HANDLER_KEY_OP,      'update_db'),
172
    (QUEUE_GANETI_EVENTS_NETWORK, settings.EXCHANGE_GANETI, DB_HANDLER_KEY_NETWORK, 'update_network'),
173
    (QUEUE_GANETI_EVENTS_NET, settings.EXCHANGE_GANETI, DB_HANDLER_KEY_NET,     'update_net'),
174
    (QUEUE_GANETI_BUILD_PROGR, settings.EXCHANGE_GANETI, BUILD_MONITOR_HANDLER,  'update_build_progress'),
175
    ]
176

    
177
    if settings.DEBUG is True:
178
        BINDINGS += [
179
            # Queue       # Exchange          # RouteKey  # Handler
180
            (QUEUE_DEBUG, settings.EXCHANGE_GANETI, '#',  'dummy_proc'),
181
            (QUEUE_DEBUG, settings.EXCHANGE_CRON,   '#',  'dummy_proc'),
182
            (QUEUE_DEBUG, settings.EXCHANGE_API,    '#',  'dummy_proc'),
183
        ]
184
        QUEUES += (QUEUE_DEBUG,)
185

    
186

    
187
def parse_arguments(args):
188
    from optparse import OptionParser
189

    
190
    default_pid_file = \
191
        os.path.join(".", "var", "run", "synnefo", "dispatcher.pid")[1:]
192
    parser = OptionParser()
193
    parser.add_option("-d", "--debug", action="store_true", default=False,
194
                      dest="debug", help="Enable debug mode")
195
    parser.add_option("-w", "--workers", default=2, dest="workers",
196
                      help="Number of workers to spawn", type="int")
197
    parser.add_option("-p", "--pid-file", dest="pid_file",
198
                      default=default_pid_file,
199
                      help="Save PID to file (default: %s)" % default_pid_file)
200
    parser.add_option("--purge-queues", action="store_true",
201
                      default=False, dest="purge_queues",
202
                      help="Remove all declared queues (DANGEROUS!)")
203
    parser.add_option("--purge-exchanges", action="store_true",
204
                      default=False, dest="purge_exchanges",
205
                      help="Remove all exchanges. Implies deleting all queues \
206
                           first (DANGEROUS!)")
207
    parser.add_option("--drain-queue", dest="drain_queue",
208
                      help="Strips a queue from all outstanding messages")
209

    
210
    return parser.parse_args(args)
211

    
212

    
213
def purge_queues():
214
    """
215
        Delete declared queues from RabbitMQ. Use with care!
216
    """
217
    global QUEUES, BINDINGS
218
    client = AMQPClient(max_retries=120)
219
    client.connect()
220

    
221
    print "Queues to be deleted: ", QUEUES
222

    
223
    if not get_user_confirmation():
224
        return
225

    
226
    for queue in QUEUES:
227
        result = client.queue_delete(queue=queue)
228
        print "Deleting queue %s. Result: %s" % (queue, result)
229

    
230
    client.close()
231

    
232

    
233
def purge_exchanges():
234
    """Delete declared exchanges from RabbitMQ, after removing all queues"""
235
    global QUEUES, BINDINGS
236
    purge_queues()
237

    
238
    client = AMQPClient()
239
    client.connect()
240

    
241
    print "Exchanges to be deleted: ", settings.EXCHANGES
242

    
243
    if not get_user_confirmation():
244
        return
245

    
246
    for exchange in settings.EXCHANGES:
247
        result = client.exchange_delete(exchange=exchange)
248
        print "Deleting exchange %s. Result: %s" % (exchange, result)
249

    
250
    client.close()
251

    
252

    
253
def drain_queue(queue):
254
    """Strip a (declared) queue from all outstanding messages"""
255
    global QUEUES, BINDINGS
256
    if not queue:
257
        return
258

    
259
    if not queue in QUEUES:
260
        print "Queue %s not configured" % queue
261
        return
262

    
263
    print "Queue to be drained: %s" % queue
264

    
265
    if not get_user_confirmation():
266
        return
267

    
268
    client = AMQPClient()
269
    client.connect()
270

    
271
    tag = client.basic_consume(queue=queue, callback=callbacks.dummy_proc)
272

    
273
    print "Queue draining about to start, hit Ctrl+c when done"
274
    time.sleep(2)
275
    print "Queue draining starting"
276

    
277
    num_processed = 0
278
    while True:
279
        client.basic_wait()
280
        num_processed += 1
281
        sys.stderr.write("Ignored %d messages\r" % num_processed)
282

    
283
    client.basic_cancel(tag)
284
    client.close()
285

    
286

    
287
def get_user_confirmation():
288
    ans = raw_input("Are you sure (N/y):")
289

    
290
    if not ans:
291
        return False
292
    if ans not in ['Y', 'y']:
293
        return False
294
    return True
295

    
296

    
297
def debug_mode():
298
    disp = Dispatcher(debug=True)
299
    disp.wait()
300

    
301

    
302
def daemon_mode(opts):
303
    disp = Dispatcher(debug=False)
304
    disp.wait()
305

    
306

    
307
def main():
308
    (opts, args) = parse_arguments(sys.argv[1:])
309

    
310
    # Rename this process so 'ps' output looks like this is a native
311
    # executable.  Can not seperate command-line arguments from actual name of
312
    # the executable by NUL bytes, so only show the name of the executable
313
    # instead.  setproctitle.setproctitle("\x00".join(sys.argv))
314
    setproctitle.setproctitle(sys.argv[0])
315

    
316
    if opts.debug:
317
        stream_handler = logging.StreamHandler()
318
        formatter = logging.Formatter("%(asctime)s %(module)s %(levelname)s: %(message)s",
319
                                      "%Y-%m-%d %H:%M:%S")
320
        stream_handler.setFormatter(formatter)
321
        log.addHandler(stream_handler)
322

    
323
    # Init the global variables containing the queues
324
    _init_queues()
325

    
326
    # Special case for the clean up queues action
327
    if opts.purge_queues:
328
        purge_queues()
329
        return
330

    
331
    # Special case for the clean up exch action
332
    if opts.purge_exchanges:
333
        purge_exchanges()
334
        return
335

    
336
    if opts.drain_queue:
337
        drain_queue(opts.drain_queue)
338
        return
339

    
340
    # Debug mode, process messages without daemonizing
341
    if opts.debug:
342
        debug_mode()
343
        return
344

    
345
    # Create pidfile,
346
    pidf = pidlockfile.TimeoutPIDLockFile(opts.pid_file, 10)
347

    
348
    if daemon.runner.is_pidfile_stale(pidf):
349
        log.warning("Removing stale PID lock file %s", pidf.path)
350
        pidf.break_lock()
351

    
352
    files_preserve = []
353
    for handler in log.handlers:
354
        stream = getattr(handler, 'stream')
355
        if stream and hasattr(stream, 'fileno'):
356
            files_preserve.append(handler.stream)
357

    
358
    stderr_stream = None
359
    for handler in log.handlers:
360
        stream = getattr(handler, 'stream')
361
        if stream and hasattr(handler, 'baseFilename'):
362
            stderr_stream = stream
363
            break
364

    
365
    daemon_context = daemon.DaemonContext(
366
        pidfile=pidf,
367
        umask=0022,
368
        stdout=stderr_stream,
369
        stderr=stderr_stream,
370
        files_preserve=files_preserve)
371

    
372
    try:
373
        daemon_context.open()
374
    except (pidlockfile.AlreadyLocked, LockTimeout):
375
        log.critical("Failed to lock pidfile %s, another instance running?",
376
                     pidf.path)
377
        sys.exit(1)
378

    
379
    log.info("Became a daemon")
380

    
381
    if 'gevent' in sys.modules:
382
        # A fork() has occured while daemonizing. If running in
383
        # gevent context we *must* reinit gevent
384
        log.debug("gevent imported. Reinitializing gevent")
385
        import gevent
386
        gevent.reinit()
387

    
388
    # Catch every exception, make sure it gets logged properly
389
    try:
390
        daemon_mode(opts)
391
    except Exception:
392
        log.exception("Unknown error")
393
        raise
394

    
395
if __name__ == "__main__":
396
    sys.exit(main())
397

    
398
# vim: set sta sts=4 shiftwidth=4 sw=4 et ai :