Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / dispatcher.py @ b1bb9251

History | View | Annotate | Download (13.2 kB)

1
#!/usr/bin/env python
2
# Copyright 2011 GRNET S.A. All rights reserved.
3
#
4
# Redistribution and use in source and binary forms, with or without
5
# modification, are permitted provided that the following conditions
6
# are met:
7
#
8
#   1. Redistributions of source code must retain the above copyright
9
#      notice, this list of conditions and the following disclaimer.
10
#
11
#  2. Redistributions in binary form must reproduce the above copyright
12
#     notice, this list of conditions and the following disclaimer in the
13
#     documentation and/or other materials provided with the distribution.
14
#
15
# THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
16
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18
# ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
19
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
20
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
21
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
22
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
23
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
24
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
25
# SUCH DAMAGE.
26
#
27
# The views and conclusions contained in the software and documentation are
28
# those of the authors and should not be interpreted as representing official
29
# policies, either expressed or implied, of GRNET S.A.
30

    
31

    
32
""" Message queue setup, dispatch and admin
33

34
This program sets up connections to the queues configured in settings.py
35
and implements the message wait and dispatch loops. Actual messages are
36
handled in the dispatched functions.
37

38
"""
39
from django.core.management import setup_environ
40

    
41
# Fix path to import synnefo settings
42
import sys
43
import os
44
path = os.path.normpath(os.path.join(os.getcwd(), '..'))
45
sys.path.append(path)
46
from synnefo import settings
47
setup_environ(settings)
48

    
49
from django.db import close_connection
50

    
51
import time
52

    
53
import daemon
54
import daemon.runner
55
from lockfile import LockTimeout
56
# Take care of differences between python-daemon versions.
57
try:
58
    from daemon import pidfile as pidlockfile
59
except:
60
    from daemon import pidlockfile
61
import setproctitle
62

    
63
from synnefo.lib.amqp import AMQPClient
64
from synnefo.logic import callbacks
65

    
66
import logging
67
log = logging.getLogger()
68

    
69
# Queue names
70
QUEUES = []
71

    
72
# Queue bindings to exchanges
73
BINDINGS = []
74

    
75

    
76
class Dispatcher:
77
    debug = False
78

    
79
    def __init__(self, debug=False):
80
        self.debug = debug
81
        self._init()
82

    
83
    def wait(self):
84
        log.info("Waiting for messages..")
85
        timeout = 600
86
        while True:
87
            try:
88
                # Close the Django DB connection before processing
89
                # every incoming message. This plays nicely with
90
                # DB connection pooling, if enabled and allows
91
                # the dispatcher to recover from broken connections
92
                # gracefully.
93
                close_connection()
94
                msg = self.client.basic_wait(timeout=timeout)
95
                if not msg:
96
                    log.warning("Idle connection for %d seconds. Will connect"
97
                                " to a different host. Verify that"
98
                                " snf-ganeti-eventd is running!!", timeout)
99
                    self.client.reconnect()
100
            except SystemExit:
101
                break
102
            except Exception as e:
103
                log.exception("Caught unexpected exception: %s", e)
104

    
105
        self.client.basic_cancel()
106
        self.client.close()
107

    
108
    def _init(self):
109
        global QUEUES, BINDINGS
110
        log.info("Initializing")
111

    
112
        self.client = AMQPClient()
113
        # Connect to AMQP host
114
        self.client.connect()
115

    
116
        # Declare queues and exchanges
117
        for exchange in settings.EXCHANGES:
118
            self.client.exchange_declare(exchange=exchange,
119
                                         type="topic")
120

    
121
        for queue in QUEUES:
122
            # Queues are mirrored to all RabbitMQ brokers
123
            self.client.queue_declare(queue=queue, mirrored=True)
124

    
125
        bindings = BINDINGS
126

    
127
        # Bind queues to handler methods
128
        for binding in bindings:
129
            try:
130
                callback = getattr(callbacks, binding[3])
131
            except AttributeError:
132
                log.error("Cannot find callback %s", binding[3])
133
                raise SystemExit(1)
134

    
135
            self.client.queue_bind(queue=binding[0], exchange=binding[1],
136
                                   routing_key=binding[2])
137

    
138
            self.client.basic_consume(queue=binding[0],
139
                                      callback=callback,
140
                                      prefetch_count=5)
141

    
142
            log.debug("Binding %s(%s) to queue %s with handler %s",
143
                      binding[1], binding[2], binding[0], binding[3])
144

    
145

    
146
def _init_queues():
147
    global QUEUES, BINDINGS
148

    
149
    # Queue declarations
150
    prefix = settings.BACKEND_PREFIX_ID.split('-')[0]
151

    
152
    QUEUE_GANETI_EVENTS_OP = "%s-events-op" % prefix
153
    QUEUE_GANETI_EVENTS_NETWORK = "%s-events-network" % prefix
154
    QUEUE_GANETI_EVENTS_NET = "%s-events-net" % prefix
155
    QUEUE_GANETI_BUILD_PROGR = "%s-events-progress" % prefix
156
    QUEUE_RECONC = "%s-reconciliation" % prefix
157
    if settings.DEBUG is True:
158
        QUEUE_DEBUG = "%s-debug" % prefix  # Debug queue, retrieves all messages
159

    
160
    QUEUES = (QUEUE_GANETI_EVENTS_OP, QUEUE_GANETI_EVENTS_NETWORK, QUEUE_GANETI_EVENTS_NET, QUEUE_RECONC,
161
              QUEUE_GANETI_BUILD_PROGR)
162

    
163
    # notifications of type "ganeti-op-status"
164
    DB_HANDLER_KEY_OP = 'ganeti.%s.event.op' % prefix
165
    # notifications of type "ganeti-network-status"
166
    DB_HANDLER_KEY_NETWORK = 'ganeti.%s.event.network' % prefix
167
    # notifications of type "ganeti-net-status"
168
    DB_HANDLER_KEY_NET = 'ganeti.%s.event.net' % prefix
169
    # notifications of type "ganeti-create-progress"
170
    BUILD_MONITOR_HANDLER = 'ganeti.%s.event.progress' % prefix
171
    # reconciliation
172
    RECONC_HANDLER = 'reconciliation.%s.*' % prefix
173

    
174
    BINDINGS = [
175
    # Queue                   # Exchange                # RouteKey              # Handler
176
    (QUEUE_GANETI_EVENTS_OP,  settings.EXCHANGE_GANETI, DB_HANDLER_KEY_OP,      'update_db'),
177
    (QUEUE_GANETI_EVENTS_NETWORK, settings.EXCHANGE_GANETI, DB_HANDLER_KEY_NETWORK, 'update_network'),
178
    (QUEUE_GANETI_EVENTS_NET, settings.EXCHANGE_GANETI, DB_HANDLER_KEY_NET,     'update_net'),
179
    (QUEUE_GANETI_BUILD_PROGR, settings.EXCHANGE_GANETI, BUILD_MONITOR_HANDLER,  'update_build_progress'),
180
    ]
181

    
182
    if settings.DEBUG is True:
183
        BINDINGS += [
184
            # Queue       # Exchange          # RouteKey  # Handler
185
            (QUEUE_DEBUG, settings.EXCHANGE_GANETI, '#',  'dummy_proc'),
186
            (QUEUE_DEBUG, settings.EXCHANGE_CRON,   '#',  'dummy_proc'),
187
            (QUEUE_DEBUG, settings.EXCHANGE_API,    '#',  'dummy_proc'),
188
        ]
189
        QUEUES += (QUEUE_DEBUG,)
190

    
191

    
192
def parse_arguments(args):
193
    from optparse import OptionParser
194

    
195
    default_pid_file = \
196
        os.path.join(".", "var", "run", "synnefo", "dispatcher.pid")[1:]
197
    parser = OptionParser()
198
    parser.add_option("-d", "--debug", action="store_true", default=False,
199
                      dest="debug", help="Enable debug mode")
200
    parser.add_option("-w", "--workers", default=2, dest="workers",
201
                      help="Number of workers to spawn", type="int")
202
    parser.add_option("-p", "--pid-file", dest="pid_file",
203
                      default=default_pid_file,
204
                      help="Save PID to file (default: %s)" % default_pid_file)
205
    parser.add_option("--purge-queues", action="store_true",
206
                      default=False, dest="purge_queues",
207
                      help="Remove all declared queues (DANGEROUS!)")
208
    parser.add_option("--purge-exchanges", action="store_true",
209
                      default=False, dest="purge_exchanges",
210
                      help="Remove all exchanges. Implies deleting all queues \
211
                           first (DANGEROUS!)")
212
    parser.add_option("--drain-queue", dest="drain_queue",
213
                      help="Strips a queue from all outstanding messages")
214

    
215
    return parser.parse_args(args)
216

    
217

    
218
def purge_queues():
219
    """
220
        Delete declared queues from RabbitMQ. Use with care!
221
    """
222
    global QUEUES, BINDINGS
223
    client = AMQPClient(max_retries=120)
224
    client.connect()
225

    
226
    print "Queues to be deleted: ", QUEUES
227

    
228
    if not get_user_confirmation():
229
        return
230

    
231
    for queue in QUEUES:
232
        result = client.queue_delete(queue=queue)
233
        print "Deleting queue %s. Result: %s" % (queue, result)
234

    
235
    client.close()
236

    
237

    
238
def purge_exchanges():
239
    """Delete declared exchanges from RabbitMQ, after removing all queues"""
240
    global QUEUES, BINDINGS
241
    purge_queues()
242

    
243
    client = AMQPClient()
244
    client.connect()
245

    
246
    print "Exchanges to be deleted: ", settings.EXCHANGES
247

    
248
    if not get_user_confirmation():
249
        return
250

    
251
    for exchange in settings.EXCHANGES:
252
        result = client.exchange_delete(exchange=exchange)
253
        print "Deleting exchange %s. Result: %s" % (exchange, result)
254

    
255
    client.close()
256

    
257

    
258
def drain_queue(queue):
259
    """Strip a (declared) queue from all outstanding messages"""
260
    global QUEUES, BINDINGS
261
    if not queue:
262
        return
263

    
264
    if not queue in QUEUES:
265
        print "Queue %s not configured" % queue
266
        return
267

    
268
    print "Queue to be drained: %s" % queue
269

    
270
    if not get_user_confirmation():
271
        return
272

    
273
    client = AMQPClient()
274
    client.connect()
275

    
276
    tag = client.basic_consume(queue=queue, callback=callbacks.dummy_proc)
277

    
278
    print "Queue draining about to start, hit Ctrl+c when done"
279
    time.sleep(2)
280
    print "Queue draining starting"
281

    
282
    num_processed = 0
283
    while True:
284
        client.basic_wait()
285
        num_processed += 1
286
        sys.stderr.write("Ignored %d messages\r" % num_processed)
287

    
288
    client.basic_cancel(tag)
289
    client.close()
290

    
291

    
292
def get_user_confirmation():
293
    ans = raw_input("Are you sure (N/y):")
294

    
295
    if not ans:
296
        return False
297
    if ans not in ['Y', 'y']:
298
        return False
299
    return True
300

    
301

    
302
def debug_mode():
303
    disp = Dispatcher(debug=True)
304
    disp.wait()
305

    
306

    
307
def daemon_mode(opts):
308
    disp = Dispatcher(debug=False)
309
    disp.wait()
310

    
311

    
312
def setup_logging(opts):
313
    import logging
314
    formatter = logging.Formatter("%(asctime)s %(name)s %(module)s [%(levelname)s] %(message)s")
315
    if opts.debug:
316
        stream_handler = logging.StreamHandler()
317
        stream_handler.setFormatter(formatter)
318
        log.addHandler(stream_handler)
319
    else:
320
        import logging.handlers
321
        log_file = "/var/log/synnefo/dispatcher.log"
322
        file_handler = logging.handlers.WatchedFileHandler(log_file)
323
        file_handler.setFormatter(formatter)
324
        log.addHandler(file_handler)
325

    
326
    log.setLevel(logging.DEBUG)
327

    
328

    
329
def main():
330
    (opts, args) = parse_arguments(sys.argv[1:])
331

    
332
    # Rename this process so 'ps' output looks like this is a native
333
    # executable.  Can not seperate command-line arguments from actual name of
334
    # the executable by NUL bytes, so only show the name of the executable
335
    # instead.  setproctitle.setproctitle("\x00".join(sys.argv))
336
    setproctitle.setproctitle(sys.argv[0])
337
    setup_logging(opts)
338

    
339
    # Init the global variables containing the queues
340
    _init_queues()
341

    
342
    # Special case for the clean up queues action
343
    if opts.purge_queues:
344
        purge_queues()
345
        return
346

    
347
    # Special case for the clean up exch action
348
    if opts.purge_exchanges:
349
        purge_exchanges()
350
        return
351

    
352
    if opts.drain_queue:
353
        drain_queue(opts.drain_queue)
354
        return
355

    
356
    # Debug mode, process messages without daemonizing
357
    if opts.debug:
358
        debug_mode()
359
        return
360

    
361
    # Create pidfile,
362
    pidf = pidlockfile.TimeoutPIDLockFile(opts.pid_file, 10)
363

    
364
    if daemon.runner.is_pidfile_stale(pidf):
365
        log.warning("Removing stale PID lock file %s", pidf.path)
366
        pidf.break_lock()
367

    
368
    files_preserve = []
369
    for handler in log.handlers:
370
        stream = getattr(handler, 'stream')
371
        if stream and hasattr(stream, 'fileno'):
372
            files_preserve.append(handler.stream)
373

    
374
    stderr_stream = None
375
    for handler in log.handlers:
376
        stream = getattr(handler, 'stream')
377
        if stream and hasattr(handler, 'baseFilename'):
378
            stderr_stream = stream
379
            break
380

    
381
    daemon_context = daemon.DaemonContext(
382
        pidfile=pidf,
383
        umask=0022,
384
        stdout=stderr_stream,
385
        stderr=stderr_stream,
386
        files_preserve=files_preserve)
387

    
388
    try:
389
        daemon_context.open()
390
    except (pidlockfile.AlreadyLocked, LockTimeout):
391
        log.critical("Failed to lock pidfile %s, another instance running?",
392
                     pidf.path)
393
        sys.exit(1)
394

    
395
    log.info("Became a daemon")
396

    
397
    if 'gevent' in sys.modules:
398
        # A fork() has occured while daemonizing. If running in
399
        # gevent context we *must* reinit gevent
400
        log.debug("gevent imported. Reinitializing gevent")
401
        import gevent
402
        gevent.reinit()
403

    
404
    # Catch every exception, make sure it gets logged properly
405
    try:
406
        daemon_mode(opts)
407
    except Exception:
408
        log.exception("Unknown error")
409
        raise
410

    
411
if __name__ == "__main__":
412
    sys.exit(main())
413

    
414
# vim: set sta sts=4 shiftwidth=4 sw=4 et ai :