Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / dispatcher.py @ 1dc821c9

History | View | Annotate | Download (12 kB)

1
#!/usr/bin/env python
2
# Copyright 2011 GRNET S.A. All rights reserved.
3
#
4
# Redistribution and use in source and binary forms, with or without
5
# modification, are permitted provided that the following conditions
6
# are met:
7
#
8
#   1. Redistributions of source code must retain the above copyright
9
#      notice, this list of conditions and the following disclaimer.
10
#
11
#  2. Redistributions in binary form must reproduce the above copyright
12
#     notice, this list of conditions and the following disclaimer in the
13
#     documentation and/or other materials provided with the distribution.
14
#
15
# THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
16
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18
# ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
19
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
20
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
21
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
22
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
23
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
24
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
25
# SUCH DAMAGE.
26
#
27
# The views and conclusions contained in the software and documentation are
28
# those of the authors and should not be interpreted as representing official
29
# policies, either expressed or implied, of GRNET S.A.
30

    
31

    
32
""" Message queue setup, dispatch and admin
33

34
This program sets up connections to the queues configured in settings.py
35
and implements the message wait and dispatch loops. Actual messages are
36
handled in the dispatched functions.
37

38
"""
39
from django.core.management import setup_environ
40

    
41
# Fix path to import synnefo settings
42
import sys
43
import os
44
path = os.path.normpath(os.path.join(os.getcwd(), '..'))
45
sys.path.append(path)
46
from synnefo import settings
47
setup_environ(settings)
48

    
49
import logging
50
import time
51

    
52
import daemon
53
import daemon.runner
54
from lockfile import LockTimeout
55
# Take care of differences between python-daemon versions.
56
try:
57
    from daemon import pidfile as pidlockfile
58
except:
59
    from daemon import pidlockfile
60
import setproctitle
61

    
62
from synnefo.lib.amqp import AMQPClient
63
from synnefo.logic import callbacks
64

    
65
from synnefo.util.dictconfig import dictConfig
66
dictConfig(settings.DISPATCHER_LOGGING)
67
log = logging.getLogger()
68

    
69
# Queue names
70
QUEUES = []
71

    
72
# Queue bindings to exchanges
73
BINDINGS = []
74

    
75

    
76
class Dispatcher:
77
    debug = False
78
    client_promises = []
79

    
80
    def __init__(self, debug=False):
81
        self.debug = debug
82
        self._init()
83

    
84
    def wait(self):
85
        log.info("Waiting for messages..")
86
        while True:
87
            try:
88
                self.client.basic_wait()
89
            except SystemExit:
90
                break
91
            except Exception as e:
92
                log.exception("Caught unexpected exception: %s", e)
93

    
94
        self.client.basic_cancel()
95
        self.client.close()
96

    
97
    def _init(self):
98
        global QUEUES, BINDINGS
99
        log.info("Initializing")
100

    
101
        self.client = AMQPClient()
102
        # Connect to AMQP host
103
        self.client.connect()
104

    
105
        # Declare queues and exchanges
106
        for exchange in settings.EXCHANGES:
107
            self.client.exchange_declare(exchange=exchange,
108
                                         type="topic")
109

    
110
        for queue in QUEUES:
111
            # Queues are mirrored to all RabbitMQ brokers
112
            self.client.queue_declare(queue=queue, mirrored=True)
113

    
114
        bindings = BINDINGS
115

    
116
        # Bind queues to handler methods
117
        for binding in bindings:
118
            try:
119
                callback = getattr(callbacks, binding[3])
120
            except AttributeError:
121
                log.error("Cannot find callback %s", binding[3])
122
                raise SystemExit(1)
123

    
124
            self.client.queue_bind(queue=binding[0], exchange=binding[1],
125
                                   routing_key=binding[2])
126

    
127
            consume_promise = self.client.basic_consume(queue=binding[0],
128
                                                        callback=callback)
129

    
130
            log.debug("Binding %s(%s) to queue %s with handler %s",
131
                      binding[1], binding[2], binding[0], binding[3])
132
            self.client_promises.append(consume_promise)
133

    
134

    
135
def _init_queues():
136
    global QUEUES, BINDINGS
137

    
138
    # Queue declarations
139
    prefix = settings.BACKEND_PREFIX_ID.split('-')[0]
140

    
141
    QUEUE_GANETI_EVENTS_OP = "%s-events-op" % prefix
142
    QUEUE_GANETI_EVENTS_NETWORK = "%s-events-network" % prefix
143
    QUEUE_GANETI_EVENTS_NET = "%s-events-net" % prefix
144
    QUEUE_GANETI_BUILD_PROGR = "%s-events-progress" % prefix
145
    QUEUE_RECONC = "%s-reconciliation" % prefix
146
    if settings.DEBUG is True:
147
        QUEUE_DEBUG = "%s-debug" % prefix  # Debug queue, retrieves all messages
148

    
149
    QUEUES = (QUEUE_GANETI_EVENTS_OP, QUEUE_GANETI_EVENTS_NETWORK, QUEUE_GANETI_EVENTS_NET, QUEUE_RECONC,
150
              QUEUE_GANETI_BUILD_PROGR)
151

    
152
    # notifications of type "ganeti-op-status"
153
    DB_HANDLER_KEY_OP = 'ganeti.%s.event.op' % prefix
154
    # notifications of type "ganeti-network-status"
155
    DB_HANDLER_KEY_NETWORK = 'ganeti.%s.event.network' % prefix
156
    # notifications of type "ganeti-net-status"
157
    DB_HANDLER_KEY_NET = 'ganeti.%s.event.net' % prefix
158
    # notifications of type "ganeti-create-progress"
159
    BUILD_MONITOR_HANDLER = 'ganeti.%s.event.progress' % prefix
160
    # reconciliation
161
    RECONC_HANDLER = 'reconciliation.%s.*' % prefix
162

    
163
    BINDINGS = [
164
    # Queue                   # Exchange                # RouteKey              # Handler
165
    (QUEUE_GANETI_EVENTS_OP,  settings.EXCHANGE_GANETI, DB_HANDLER_KEY_OP,      'update_db'),
166
    (QUEUE_GANETI_EVENTS_NETWORK, settings.EXCHANGE_GANETI, DB_HANDLER_KEY_NETWORK, 'update_network'),
167
    (QUEUE_GANETI_EVENTS_NET, settings.EXCHANGE_GANETI, DB_HANDLER_KEY_NET,     'update_net'),
168
    (QUEUE_GANETI_BUILD_PROGR, settings.EXCHANGE_GANETI, BUILD_MONITOR_HANDLER,  'update_build_progress'),
169
    ]
170

    
171
    if settings.DEBUG is True:
172
        BINDINGS += [
173
            # Queue       # Exchange          # RouteKey  # Handler
174
            (QUEUE_DEBUG, settings.EXCHANGE_GANETI, '#',  'dummy_proc'),
175
            (QUEUE_DEBUG, settings.EXCHANGE_CRON,   '#',  'dummy_proc'),
176
            (QUEUE_DEBUG, settings.EXCHANGE_API,    '#',  'dummy_proc'),
177
        ]
178
        QUEUES += (QUEUE_DEBUG,)
179

    
180

    
181
def parse_arguments(args):
182
    from optparse import OptionParser
183

    
184
    default_pid_file = \
185
        os.path.join(".", "var", "run", "synnefo", "dispatcher.pid")[1:]
186
    parser = OptionParser()
187
    parser.add_option("-d", "--debug", action="store_true", default=False,
188
                      dest="debug", help="Enable debug mode")
189
    parser.add_option("-w", "--workers", default=2, dest="workers",
190
                      help="Number of workers to spawn", type="int")
191
    parser.add_option("-p", "--pid-file", dest="pid_file",
192
                      default=default_pid_file,
193
                      help="Save PID to file (default: %s)" % default_pid_file)
194
    parser.add_option("--purge-queues", action="store_true",
195
                      default=False, dest="purge_queues",
196
                      help="Remove all declared queues (DANGEROUS!)")
197
    parser.add_option("--purge-exchanges", action="store_true",
198
                      default=False, dest="purge_exchanges",
199
                      help="Remove all exchanges. Implies deleting all queues \
200
                           first (DANGEROUS!)")
201
    parser.add_option("--drain-queue", dest="drain_queue",
202
                      help="Strips a queue from all outstanding messages")
203

    
204
    return parser.parse_args(args)
205

    
206

    
207
def purge_queues():
208
    """
209
        Delete declared queues from RabbitMQ. Use with care!
210
    """
211
    global QUEUES, BINDINGS
212
    client = AMQPClient()
213
    client.connect()
214

    
215
    print "Queues to be deleted: ", QUEUES
216

    
217
    if not get_user_confirmation():
218
        return
219

    
220
    for queue in QUEUES:
221
        result = client.queue_delete(queue=queue)
222
        print "Deleting queue %s. Result: %s" % (queue, result)
223

    
224
    client.close()
225

    
226

    
227
def purge_exchanges():
228
    """Delete declared exchanges from RabbitMQ, after removing all queues"""
229
    global QUEUES, BINDINGS
230
    purge_queues()
231

    
232
    client = AMQPClient()
233
    client.connect()
234

    
235
    print "Exchanges to be deleted: ", settings.EXCHANGES
236

    
237
    if not get_user_confirmation():
238
        return
239

    
240
    for exchange in settings.EXCHANGES:
241
        result = client.exchange_delete(exchange=exchange)
242
        print "Deleting exchange %s. Result: %s" % (exchange, result)
243

    
244
    client.close()
245

    
246

    
247
def drain_queue(queue):
248
    """Strip a (declared) queue from all outstanding messages"""
249
    global QUEUES, BINDINGS
250
    if not queue:
251
        return
252

    
253
    if not queue in QUEUES:
254
        print "Queue %s not configured" % queue
255
        return
256

    
257
    print "Queue to be drained: %s" % queue
258

    
259
    if not get_user_confirmation():
260
        return
261

    
262
    client = AMQPClient()
263
    client.connect()
264

    
265
    tag = client.basic_consume(queue=queue, callback=callbacks.dummy_proc)
266

    
267
    print "Queue draining about to start, hit Ctrl+c when done"
268
    time.sleep(2)
269
    print "Queue draining starting"
270

    
271
    num_processed = 0
272
    while True:
273
        client.basic_wait()
274
        num_processed += 1
275
        sys.stderr.write("Ignored %d messages\r" % num_processed)
276

    
277
    client.basic_cancel(tag)
278
    client.close()
279

    
280

    
281
def get_user_confirmation():
282
    ans = raw_input("Are you sure (N/y):")
283

    
284
    if not ans:
285
        return False
286
    if ans not in ['Y', 'y']:
287
        return False
288
    return True
289

    
290

    
291
def debug_mode():
292
    disp = Dispatcher(debug=True)
293
    disp.wait()
294

    
295

    
296
def daemon_mode(opts):
297
    disp = Dispatcher(debug=False)
298
    disp.wait()
299

    
300

    
301
def main():
302
    (opts, args) = parse_arguments(sys.argv[1:])
303

    
304
    # Rename this process so 'ps' output looks like this is a native
305
    # executable.  Can not seperate command-line arguments from actual name of
306
    # the executable by NUL bytes, so only show the name of the executable
307
    # instead.  setproctitle.setproctitle("\x00".join(sys.argv))
308
    setproctitle.setproctitle(sys.argv[0])
309

    
310
    # Init the global variables containing the queues
311
    _init_queues()
312

    
313
    # Special case for the clean up queues action
314
    if opts.purge_queues:
315
        purge_queues()
316
        return
317

    
318
    # Special case for the clean up exch action
319
    if opts.purge_exchanges:
320
        purge_exchanges()
321
        return
322

    
323
    if opts.drain_queue:
324
        drain_queue(opts.drain_queue)
325
        return
326

    
327
    # Debug mode, process messages without daemonizing
328
    if opts.debug:
329
        debug_mode()
330
        return
331

    
332
    # Create pidfile,
333
    pidf = pidlockfile.TimeoutPIDLockFile(opts.pid_file, 10)
334

    
335
    if daemon.runner.is_pidfile_stale(pidf):
336
        log.warning("Removing stale PID lock file %s", pidf.path)
337
        pidf.break_lock()
338

    
339
    files_preserve = []
340
    for handler in log.handlers:
341
        stream = getattr(handler, 'stream')
342
        if stream and hasattr(stream, 'fileno'):
343
            files_preserve.append(handler.stream)
344

    
345
    stderr_stream = None
346
    for handler in log.handlers:
347
        stream = getattr(handler, 'stream')
348
        if stream and hasattr(handler, 'baseFilename'):
349
            stderr_stream = stream
350
            break
351

    
352
    daemon_context = daemon.DaemonContext(
353
        pidfile=pidf,
354
        umask=0022,
355
        stdout=stderr_stream,
356
        stderr=stderr_stream,
357
        files_preserve=files_preserve)
358

    
359
    try:
360
        daemon_context.open()
361
    except (pidlockfile.AlreadyLocked, LockTimeout):
362
        log.critical("Failed to lock pidfile %s, another instance running?",
363
                     pidf.path)
364
        sys.exit(1)
365

    
366
    log.info("Became a daemon")
367

    
368
    if 'gevent' in sys.modules:
369
        # A fork() has occured while daemonizing. If running in
370
        # gevent context we *must* reinit gevent
371
        log.debug("gevent imported. Reinitializing gevent")
372
        import gevent
373
        gevent.reinit()
374

    
375
    # Catch every exception, make sure it gets logged properly
376
    try:
377
        daemon_mode(opts)
378
    except Exception:
379
        log.exception("Unknown error")
380
        raise
381

    
382
if __name__ == "__main__":
383
    sys.exit(main())
384

    
385
# vim: set sta sts=4 shiftwidth=4 sw=4 et ai :