Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / dispatcher.py @ 8ec69269

History | View | Annotate | Download (12.2 kB)

1
#!/usr/bin/env python
2
# Copyright 2011 GRNET S.A. All rights reserved.
3
#
4
# Redistribution and use in source and binary forms, with or without
5
# modification, are permitted provided that the following conditions
6
# are met:
7
#
8
#   1. Redistributions of source code must retain the above copyright
9
#      notice, this list of conditions and the following disclaimer.
10
#
11
#  2. Redistributions in binary form must reproduce the above copyright
12
#     notice, this list of conditions and the following disclaimer in the
13
#     documentation and/or other materials provided with the distribution.
14
#
15
# THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
16
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18
# ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
19
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
20
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
21
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
22
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
23
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
24
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
25
# SUCH DAMAGE.
26
#
27
# The views and conclusions contained in the software and documentation are
28
# those of the authors and should not be interpreted as representing official
29
# policies, either expressed or implied, of GRNET S.A.
30

    
31

    
32
""" Message queue setup, dispatch and admin
33

34
This program sets up connections to the queues configured in settings.py
35
and implements the message wait and dispatch loops. Actual messages are
36
handled in the dispatched functions.
37

38
"""
39
from django.core.management import setup_environ
40

    
41
# Fix path to import synnefo settings
42
import sys
43
import os
44
path = os.path.normpath(os.path.join(os.getcwd(), '..'))
45
sys.path.append(path)
46
from synnefo import settings
47
setup_environ(settings)
48

    
49
import logging
50
import time
51

    
52
import daemon
53
import daemon.runner
54
from lockfile import LockTimeout
55
# Take care of differences between python-daemon versions.
56
try:
57
    from daemon import pidfile as pidlockfile
58
except:
59
    from daemon import pidlockfile
60
import setproctitle
61

    
62
from synnefo.lib.amqp import AMQPClient
63
from synnefo.logic import callbacks
64

    
65
from synnefo.util.dictconfig import dictConfig
66
dictConfig(settings.DISPATCHER_LOGGING)
67
log = logging.getLogger()
68

    
69
# Queue names
70
QUEUES = []
71

    
72
# Queue bindings to exchanges
73
BINDINGS = []
74

    
75

    
76
class Dispatcher:
77
    debug = False
78

    
79
    def __init__(self, debug=False):
80
        self.debug = debug
81
        self._init()
82

    
83
    def wait(self):
84
        log.info("Waiting for messages..")
85
        while True:
86
            try:
87
                self.client.basic_wait()
88
            except SystemExit:
89
                break
90
            except Exception as e:
91
                log.exception("Caught unexpected exception: %s", e)
92

    
93
        self.client.basic_cancel()
94
        self.client.close()
95

    
96
    def _init(self):
97
        global QUEUES, BINDINGS
98
        log.info("Initializing")
99

    
100
        self.client = AMQPClient()
101
        # Connect to AMQP host
102
        self.client.connect()
103

    
104
        # Declare queues and exchanges
105
        for exchange in settings.EXCHANGES:
106
            self.client.exchange_declare(exchange=exchange,
107
                                         type="topic")
108

    
109
        for queue in QUEUES:
110
            # Queues are mirrored to all RabbitMQ brokers
111
            self.client.queue_declare(queue=queue, mirrored=True)
112

    
113
        bindings = BINDINGS
114

    
115
        # Bind queues to handler methods
116
        for binding in bindings:
117
            try:
118
                callback = getattr(callbacks, binding[3])
119
            except AttributeError:
120
                log.error("Cannot find callback %s", binding[3])
121
                raise SystemExit(1)
122

    
123
            self.client.queue_bind(queue=binding[0], exchange=binding[1],
124
                                   routing_key=binding[2])
125

    
126
            self.client.basic_consume(queue=binding[0],
127
                                                        callback=callback)
128

    
129
            log.debug("Binding %s(%s) to queue %s with handler %s",
130
                      binding[1], binding[2], binding[0], binding[3])
131

    
132

    
133
def _init_queues():
134
    global QUEUES, BINDINGS
135

    
136
    # Queue declarations
137
    prefix = settings.BACKEND_PREFIX_ID.split('-')[0]
138

    
139
    QUEUE_GANETI_EVENTS_OP = "%s-events-op" % prefix
140
    QUEUE_GANETI_EVENTS_NETWORK = "%s-events-network" % prefix
141
    QUEUE_GANETI_EVENTS_NET = "%s-events-net" % prefix
142
    QUEUE_GANETI_BUILD_PROGR = "%s-events-progress" % prefix
143
    QUEUE_RECONC = "%s-reconciliation" % prefix
144
    if settings.DEBUG is True:
145
        QUEUE_DEBUG = "%s-debug" % prefix  # Debug queue, retrieves all messages
146

    
147
    QUEUES = (QUEUE_GANETI_EVENTS_OP, QUEUE_GANETI_EVENTS_NETWORK, QUEUE_GANETI_EVENTS_NET, QUEUE_RECONC,
148
              QUEUE_GANETI_BUILD_PROGR)
149

    
150
    # notifications of type "ganeti-op-status"
151
    DB_HANDLER_KEY_OP = 'ganeti.%s.event.op' % prefix
152
    # notifications of type "ganeti-network-status"
153
    DB_HANDLER_KEY_NETWORK = 'ganeti.%s.event.network' % prefix
154
    # notifications of type "ganeti-net-status"
155
    DB_HANDLER_KEY_NET = 'ganeti.%s.event.net' % prefix
156
    # notifications of type "ganeti-create-progress"
157
    BUILD_MONITOR_HANDLER = 'ganeti.%s.event.progress' % prefix
158
    # reconciliation
159
    RECONC_HANDLER = 'reconciliation.%s.*' % prefix
160

    
161
    BINDINGS = [
162
    # Queue                   # Exchange                # RouteKey              # Handler
163
    (QUEUE_GANETI_EVENTS_OP,  settings.EXCHANGE_GANETI, DB_HANDLER_KEY_OP,      'update_db'),
164
    (QUEUE_GANETI_EVENTS_NETWORK, settings.EXCHANGE_GANETI, DB_HANDLER_KEY_NETWORK, 'update_network'),
165
    (QUEUE_GANETI_EVENTS_NET, settings.EXCHANGE_GANETI, DB_HANDLER_KEY_NET,     'update_net'),
166
    (QUEUE_GANETI_BUILD_PROGR, settings.EXCHANGE_GANETI, BUILD_MONITOR_HANDLER,  'update_build_progress'),
167
    ]
168

    
169
    if settings.DEBUG is True:
170
        BINDINGS += [
171
            # Queue       # Exchange          # RouteKey  # Handler
172
            (QUEUE_DEBUG, settings.EXCHANGE_GANETI, '#',  'dummy_proc'),
173
            (QUEUE_DEBUG, settings.EXCHANGE_CRON,   '#',  'dummy_proc'),
174
            (QUEUE_DEBUG, settings.EXCHANGE_API,    '#',  'dummy_proc'),
175
        ]
176
        QUEUES += (QUEUE_DEBUG,)
177

    
178

    
179
def parse_arguments(args):
180
    from optparse import OptionParser
181

    
182
    default_pid_file = \
183
        os.path.join(".", "var", "run", "synnefo", "dispatcher.pid")[1:]
184
    parser = OptionParser()
185
    parser.add_option("-d", "--debug", action="store_true", default=False,
186
                      dest="debug", help="Enable debug mode")
187
    parser.add_option("-w", "--workers", default=2, dest="workers",
188
                      help="Number of workers to spawn", type="int")
189
    parser.add_option("-p", "--pid-file", dest="pid_file",
190
                      default=default_pid_file,
191
                      help="Save PID to file (default: %s)" % default_pid_file)
192
    parser.add_option("--purge-queues", action="store_true",
193
                      default=False, dest="purge_queues",
194
                      help="Remove all declared queues (DANGEROUS!)")
195
    parser.add_option("--purge-exchanges", action="store_true",
196
                      default=False, dest="purge_exchanges",
197
                      help="Remove all exchanges. Implies deleting all queues \
198
                           first (DANGEROUS!)")
199
    parser.add_option("--drain-queue", dest="drain_queue",
200
                      help="Strips a queue from all outstanding messages")
201

    
202
    return parser.parse_args(args)
203

    
204

    
205
def purge_queues():
206
    """
207
        Delete declared queues from RabbitMQ. Use with care!
208
    """
209
    global QUEUES, BINDINGS
210
    client = AMQPClient()
211
    client.connect()
212

    
213
    print "Queues to be deleted: ", QUEUES
214

    
215
    if not get_user_confirmation():
216
        return
217

    
218
    for queue in QUEUES:
219
        result = client.queue_delete(queue=queue)
220
        print "Deleting queue %s. Result: %s" % (queue, result)
221

    
222
    client.close()
223

    
224

    
225
def purge_exchanges():
226
    """Delete declared exchanges from RabbitMQ, after removing all queues"""
227
    global QUEUES, BINDINGS
228
    purge_queues()
229

    
230
    client = AMQPClient()
231
    client.connect()
232

    
233
    print "Exchanges to be deleted: ", settings.EXCHANGES
234

    
235
    if not get_user_confirmation():
236
        return
237

    
238
    for exchange in settings.EXCHANGES:
239
        result = client.exchange_delete(exchange=exchange)
240
        print "Deleting exchange %s. Result: %s" % (exchange, result)
241

    
242
    client.close()
243

    
244

    
245
def drain_queue(queue):
246
    """Strip a (declared) queue from all outstanding messages"""
247
    global QUEUES, BINDINGS
248
    if not queue:
249
        return
250

    
251
    if not queue in QUEUES:
252
        print "Queue %s not configured" % queue
253
        return
254

    
255
    print "Queue to be drained: %s" % queue
256

    
257
    if not get_user_confirmation():
258
        return
259

    
260
    client = AMQPClient()
261
    client.connect()
262

    
263
    tag = client.basic_consume(queue=queue, callback=callbacks.dummy_proc)
264

    
265
    print "Queue draining about to start, hit Ctrl+c when done"
266
    time.sleep(2)
267
    print "Queue draining starting"
268

    
269
    num_processed = 0
270
    while True:
271
        client.basic_wait()
272
        num_processed += 1
273
        sys.stderr.write("Ignored %d messages\r" % num_processed)
274

    
275
    client.basic_cancel(tag)
276
    client.close()
277

    
278

    
279
def get_user_confirmation():
280
    ans = raw_input("Are you sure (N/y):")
281

    
282
    if not ans:
283
        return False
284
    if ans not in ['Y', 'y']:
285
        return False
286
    return True
287

    
288

    
289
def debug_mode():
290
    disp = Dispatcher(debug=True)
291
    disp.wait()
292

    
293

    
294
def daemon_mode(opts):
295
    disp = Dispatcher(debug=False)
296
    disp.wait()
297

    
298

    
299
def main():
300
    (opts, args) = parse_arguments(sys.argv[1:])
301

    
302
    # Rename this process so 'ps' output looks like this is a native
303
    # executable.  Can not seperate command-line arguments from actual name of
304
    # the executable by NUL bytes, so only show the name of the executable
305
    # instead.  setproctitle.setproctitle("\x00".join(sys.argv))
306
    setproctitle.setproctitle(sys.argv[0])
307

    
308
    if opts.debug:
309
        stream_handler = logging.StreamHandler()
310
        formatter = logging.Formatter("%(asctime)s %(module)s %(levelname)s: %(message)s",
311
                                      "%Y-%m-%d %H:%M:%S")
312
        stream_handler.setFormatter(formatter)
313
        log.addHandler(stream_handler)
314

    
315
    # Init the global variables containing the queues
316
    _init_queues()
317

    
318
    # Special case for the clean up queues action
319
    if opts.purge_queues:
320
        purge_queues()
321
        return
322

    
323
    # Special case for the clean up exch action
324
    if opts.purge_exchanges:
325
        purge_exchanges()
326
        return
327

    
328
    if opts.drain_queue:
329
        drain_queue(opts.drain_queue)
330
        return
331

    
332
    # Debug mode, process messages without daemonizing
333
    if opts.debug:
334
        debug_mode()
335
        return
336

    
337
    # Create pidfile,
338
    pidf = pidlockfile.TimeoutPIDLockFile(opts.pid_file, 10)
339

    
340
    if daemon.runner.is_pidfile_stale(pidf):
341
        log.warning("Removing stale PID lock file %s", pidf.path)
342
        pidf.break_lock()
343

    
344
    files_preserve = []
345
    for handler in log.handlers:
346
        stream = getattr(handler, 'stream')
347
        if stream and hasattr(stream, 'fileno'):
348
            files_preserve.append(handler.stream)
349

    
350
    stderr_stream = None
351
    for handler in log.handlers:
352
        stream = getattr(handler, 'stream')
353
        if stream and hasattr(handler, 'baseFilename'):
354
            stderr_stream = stream
355
            break
356

    
357
    daemon_context = daemon.DaemonContext(
358
        pidfile=pidf,
359
        umask=0022,
360
        stdout=stderr_stream,
361
        stderr=stderr_stream,
362
        files_preserve=files_preserve)
363

    
364
    try:
365
        daemon_context.open()
366
    except (pidlockfile.AlreadyLocked, LockTimeout):
367
        log.critical("Failed to lock pidfile %s, another instance running?",
368
                     pidf.path)
369
        sys.exit(1)
370

    
371
    log.info("Became a daemon")
372

    
373
    if 'gevent' in sys.modules:
374
        # A fork() has occured while daemonizing. If running in
375
        # gevent context we *must* reinit gevent
376
        log.debug("gevent imported. Reinitializing gevent")
377
        import gevent
378
        gevent.reinit()
379

    
380
    # Catch every exception, make sure it gets logged properly
381
    try:
382
        daemon_mode(opts)
383
    except Exception:
384
        log.exception("Unknown error")
385
        raise
386

    
387
if __name__ == "__main__":
388
    sys.exit(main())
389

    
390
# vim: set sta sts=4 shiftwidth=4 sw=4 et ai :