Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / dispatcher.py @ 3f018af1

History | View | Annotate | Download (11.6 kB)

1
#!/usr/bin/env python
2
# Copyright 2011 GRNET S.A. All rights reserved.
3
#
4
# Redistribution and use in source and binary forms, with or without
5
# modification, are permitted provided that the following conditions
6
# are met:
7
#
8
#   1. Redistributions of source code must retain the above copyright
9
#      notice, this list of conditions and the following disclaimer.
10
#
11
#  2. Redistributions in binary form must reproduce the above copyright
12
#     notice, this list of conditions and the following disclaimer in the
13
#     documentation and/or other materials provided with the distribution.
14
#
15
# THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
16
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18
# ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
19
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
20
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
21
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
22
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
23
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
24
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
25
# SUCH DAMAGE.
26
#
27
# The views and conclusions contained in the software and documentation are
28
# those of the authors and should not be interpreted as representing official
29
# policies, either expressed or implied, of GRNET S.A.
30

    
31

    
32
""" Message queue setup, dispatch and admin
33

34
This program sets up connections to the queues configured in settings.py
35
and implements the message wait and dispatch loops. Actual messages are
36
handled in the dispatched functions.
37

38
"""
39
from django.core.management import setup_environ
40

    
41
# Fix path to import synnefo settings
42
import sys
43
import os
44
path = os.path.normpath(os.path.join(os.getcwd(), '..'))
45
sys.path.append(path)
46
from synnefo import settings
47
setup_environ(settings)
48

    
49
import logging
50
import time
51

    
52
import daemon
53
import daemon.runner
54
from lockfile import LockTimeout
55
# Take care of differences between python-daemon versions.
56
try:
57
    from daemon import pidfile as pidlockfile
58
except:
59
    from daemon import pidlockfile
60

    
61
from synnefo.lib.amqp import AMQPClient
62
from synnefo.logic import callbacks
63

    
64
from synnefo.util.dictconfig import dictConfig
65
dictConfig(settings.DISPATCHER_LOGGING)
66
log = logging.getLogger()
67

    
68
# Queue names
69
QUEUES = []
70

    
71
# Queue bindings to exchanges
72
BINDINGS = []
73

    
74

    
75
class Dispatcher:
76
    debug = False
77

    
78
    def __init__(self, debug=False):
79
        self.debug = debug
80
        self._init()
81

    
82
    def wait(self):
83
        log.info("Waiting for messages..")
84
        while True:
85
            try:
86
                self.client.basic_wait()
87
            except SystemExit:
88
                break
89
            except Exception as e:
90
                log.exception("Caught unexpected exception: %s", e)
91

    
92
        self.client.basic_cancel()
93
        self.client.close()
94

    
95
    def _init(self):
96
        global QUEUES, BINDINGS
97
        log.info("Initializing")
98

    
99
        self.client = AMQPClient()
100
        # Connect to AMQP host
101
        self.client.connect()
102

    
103
        # Declare queues and exchanges
104
        for exchange in settings.EXCHANGES:
105
            self.client.exchange_declare(exchange=exchange,
106
                                         type="topic")
107

    
108
        for queue in QUEUES:
109
            # Queues are mirrored to all RabbitMQ brokers
110
            self.client.queue_declare(queue=queue, mirrored=True)
111

    
112
        bindings = BINDINGS
113

    
114
        # Bind queues to handler methods
115
        for binding in bindings:
116
            try:
117
                callback = getattr(callbacks, binding[3])
118
            except AttributeError:
119
                log.error("Cannot find callback %s", binding[3])
120
                raise SystemExit(1)
121

    
122
            self.client.queue_bind(queue=binding[0], exchange=binding[1],
123
                                   routing_key=binding[2])
124

    
125
            self.client.basic_consume(queue=binding[0],
126
                                                        callback=callback)
127

    
128
            log.debug("Binding %s(%s) to queue %s with handler %s",
129
                      binding[1], binding[2], binding[0], binding[3])
130

    
131

    
132
def _init_queues():
133
    global QUEUES, BINDINGS
134

    
135
    # Queue declarations
136
    prefix = settings.BACKEND_PREFIX_ID.split('-')[0]
137

    
138
    QUEUE_GANETI_EVENTS_OP = "%s-events-op" % prefix
139
    QUEUE_GANETI_EVENTS_NETWORK = "%s-events-network" % prefix
140
    QUEUE_GANETI_EVENTS_NET = "%s-events-net" % prefix
141
    QUEUE_GANETI_BUILD_PROGR = "%s-events-progress" % prefix
142
    QUEUE_RECONC = "%s-reconciliation" % prefix
143
    if settings.DEBUG is True:
144
        QUEUE_DEBUG = "%s-debug" % prefix  # Debug queue, retrieves all messages
145

    
146
    QUEUES = (QUEUE_GANETI_EVENTS_OP, QUEUE_GANETI_EVENTS_NETWORK, QUEUE_GANETI_EVENTS_NET, QUEUE_RECONC,
147
              QUEUE_GANETI_BUILD_PROGR)
148

    
149
    # notifications of type "ganeti-op-status"
150
    DB_HANDLER_KEY_OP = 'ganeti.%s.event.op' % prefix
151
    # notifications of type "ganeti-network-status"
152
    DB_HANDLER_KEY_NETWORK = 'ganeti.%s.event.network' % prefix
153
    # notifications of type "ganeti-net-status"
154
    DB_HANDLER_KEY_NET = 'ganeti.%s.event.net' % prefix
155
    # notifications of type "ganeti-create-progress"
156
    BUILD_MONITOR_HANDLER = 'ganeti.%s.event.progress' % prefix
157
    # reconciliation
158
    RECONC_HANDLER = 'reconciliation.%s.*' % prefix
159

    
160
    BINDINGS = [
161
    # Queue                   # Exchange                # RouteKey              # Handler
162
    (QUEUE_GANETI_EVENTS_OP,  settings.EXCHANGE_GANETI, DB_HANDLER_KEY_OP,      'update_db'),
163
    (QUEUE_GANETI_EVENTS_NETWORK, settings.EXCHANGE_GANETI, DB_HANDLER_KEY_NETWORK, 'update_network'),
164
    (QUEUE_GANETI_EVENTS_NET, settings.EXCHANGE_GANETI, DB_HANDLER_KEY_NET,     'update_net'),
165
    (QUEUE_GANETI_BUILD_PROGR, settings.EXCHANGE_GANETI, BUILD_MONITOR_HANDLER,  'update_build_progress'),
166
    ]
167

    
168
    if settings.DEBUG is True:
169
        BINDINGS += [
170
            # Queue       # Exchange          # RouteKey  # Handler
171
            (QUEUE_DEBUG, settings.EXCHANGE_GANETI, '#',  'dummy_proc'),
172
            (QUEUE_DEBUG, settings.EXCHANGE_CRON,   '#',  'dummy_proc'),
173
            (QUEUE_DEBUG, settings.EXCHANGE_API,    '#',  'dummy_proc'),
174
        ]
175
        QUEUES += (QUEUE_DEBUG,)
176

    
177

    
178
def parse_arguments(args):
179
    from optparse import OptionParser
180

    
181
    default_pid_file = \
182
        os.path.join(".", "var", "run", "synnefo", "dispatcher.pid")[1:]
183
    parser = OptionParser()
184
    parser.add_option("-d", "--debug", action="store_true", default=False,
185
                      dest="debug", help="Enable debug mode")
186
    parser.add_option("-w", "--workers", default=2, dest="workers",
187
                      help="Number of workers to spawn", type="int")
188
    parser.add_option("-p", "--pid-file", dest="pid_file",
189
                      default=default_pid_file,
190
                      help="Save PID to file (default: %s)" % default_pid_file)
191
    parser.add_option("--purge-queues", action="store_true",
192
                      default=False, dest="purge_queues",
193
                      help="Remove all declared queues (DANGEROUS!)")
194
    parser.add_option("--purge-exchanges", action="store_true",
195
                      default=False, dest="purge_exchanges",
196
                      help="Remove all exchanges. Implies deleting all queues \
197
                           first (DANGEROUS!)")
198
    parser.add_option("--drain-queue", dest="drain_queue",
199
                      help="Strips a queue from all outstanding messages")
200

    
201
    return parser.parse_args(args)
202

    
203

    
204
def purge_queues():
205
    """
206
        Delete declared queues from RabbitMQ. Use with care!
207
    """
208
    global QUEUES, BINDINGS
209
    client = AMQPClient()
210
    client.connect()
211

    
212
    print "Queues to be deleted: ", QUEUES
213

    
214
    if not get_user_confirmation():
215
        return
216

    
217
    for queue in QUEUES:
218
        result = client.queue_delete(queue=queue)
219
        print "Deleting queue %s. Result: %s" % (queue, result)
220

    
221
    client.close()
222

    
223

    
224
def purge_exchanges():
225
    """Delete declared exchanges from RabbitMQ, after removing all queues"""
226
    global QUEUES, BINDINGS
227
    purge_queues()
228

    
229
    client = AMQPClient()
230
    client.connect()
231

    
232
    print "Exchanges to be deleted: ", settings.EXCHANGES
233

    
234
    if not get_user_confirmation():
235
        return
236

    
237
    for exchange in settings.EXCHANGES:
238
        result = client.exchange_delete(exchange=exchange)
239
        print "Deleting exchange %s. Result: %s" % (exchange, result)
240

    
241
    client.close()
242

    
243

    
244
def drain_queue(queue):
245
    """Strip a (declared) queue from all outstanding messages"""
246
    global QUEUES, BINDINGS
247
    if not queue:
248
        return
249

    
250
    if not queue in QUEUES:
251
        print "Queue %s not configured" % queue
252
        return
253

    
254
    print "Queue to be drained: %s" % queue
255

    
256
    if not get_user_confirmation():
257
        return
258

    
259
    client = AMQPClient()
260
    client.connect()
261

    
262
    tag = client.basic_consume(queue=queue, callback=callbacks.dummy_proc)
263

    
264
    print "Queue draining about to start, hit Ctrl+c when done"
265
    time.sleep(2)
266
    print "Queue draining starting"
267

    
268
    num_processed = 0
269
    while True:
270
        client.basic_wait()
271
        num_processed += 1
272
        sys.stderr.write("Ignored %d messages\r" % num_processed)
273

    
274
    client.basic_cancel(tag)
275
    client.close()
276

    
277

    
278
def get_user_confirmation():
279
    ans = raw_input("Are you sure (N/y):")
280

    
281
    if not ans:
282
        return False
283
    if ans not in ['Y', 'y']:
284
        return False
285
    return True
286

    
287

    
288
def debug_mode():
289
    disp = Dispatcher(debug=True)
290
    disp.wait()
291

    
292

    
293
def daemon_mode(opts):
294
    disp = Dispatcher(debug=False)
295
    disp.wait()
296

    
297

    
298
def main():
299
    (opts, args) = parse_arguments(sys.argv[1:])
300

    
301
    # Init the global variables containing the queues
302
    _init_queues()
303

    
304
    # Special case for the clean up queues action
305
    if opts.purge_queues:
306
        purge_queues()
307
        return
308

    
309
    # Special case for the clean up exch action
310
    if opts.purge_exchanges:
311
        purge_exchanges()
312
        return
313

    
314
    if opts.drain_queue:
315
        drain_queue(opts.drain_queue)
316
        return
317

    
318
    # Debug mode, process messages without daemonizing
319
    if opts.debug:
320
        debug_mode()
321
        return
322

    
323
    # Create pidfile,
324
    pidf = pidlockfile.TimeoutPIDLockFile(opts.pid_file, 10)
325

    
326
    if daemon.runner.is_pidfile_stale(pidf):
327
        log.warning("Removing stale PID lock file %s", pidf.path)
328
        pidf.break_lock()
329

    
330
    files_preserve = []
331
    for handler in log.handlers:
332
        stream = getattr(handler, 'stream')
333
        if stream and hasattr(stream, 'fileno'):
334
            files_preserve.append(handler.stream)
335

    
336
    stderr_stream = None
337
    for handler in log.handlers:
338
        stream = getattr(handler, 'stream')
339
        if stream and hasattr(handler, 'baseFilename'):
340
            stderr_stream = stream
341
            break
342

    
343
    daemon_context = daemon.DaemonContext(
344
        pidfile=pidf,
345
        umask=0022,
346
        stdout=stderr_stream,
347
        stderr=stderr_stream,
348
        files_preserve=files_preserve)
349

    
350
    try:
351
        daemon_context.open()
352
    except (pidlockfile.AlreadyLocked, LockTimeout):
353
        log.critical("Failed to lock pidfile %s, another instance running?",
354
                     pidf.path)
355
        sys.exit(1)
356

    
357
    log.info("Became a daemon")
358

    
359
    if 'gevent' in sys.modules:
360
        # A fork() has occured while daemonizing. If running in
361
        # gevent context we *must* reinit gevent
362
        log.debug("gevent imported. Reinitializing gevent")
363
        import gevent
364
        gevent.reinit()
365

    
366
    # Catch every exception, make sure it gets logged properly
367
    try:
368
        daemon_mode(opts)
369
    except Exception:
370
        log.exception("Unknown error")
371
        raise
372

    
373
if __name__ == "__main__":
374
    sys.exit(main())
375

    
376
# vim: set sta sts=4 shiftwidth=4 sw=4 et ai :