Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / dispatcher.py @ 924d8085

History | View | Annotate | Download (12.5 kB)

1
#!/usr/bin/env python
2
# Copyright 2011 GRNET S.A. All rights reserved.
3
#
4
# Redistribution and use in source and binary forms, with or without
5
# modification, are permitted provided that the following conditions
6
# are met:
7
#
8
#   1. Redistributions of source code must retain the above copyright
9
#      notice, this list of conditions and the following disclaimer.
10
#
11
#  2. Redistributions in binary form must reproduce the above copyright
12
#     notice, this list of conditions and the following disclaimer in the
13
#     documentation and/or other materials provided with the distribution.
14
#
15
# THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
16
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18
# ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
19
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
20
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
21
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
22
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
23
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
24
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
25
# SUCH DAMAGE.
26
#
27
# The views and conclusions contained in the software and documentation are
28
# those of the authors and should not be interpreted as representing official
29
# policies, either expressed or implied, of GRNET S.A.
30

    
31

    
32
""" Message queue setup, dispatch and admin
33

34
This program sets up connections to the queues configured in settings.py
35
and implements the message wait and dispatch loops. Actual messages are
36
handled in the dispatched functions.
37

38
"""
39
from django.core.management import setup_environ
40

    
41
# Fix path to import synnefo settings
42
import sys
43
import os
44
path = os.path.normpath(os.path.join(os.getcwd(), '..'))
45
sys.path.append(path)
46

    
47
from synnefo import settings
48
setup_environ(settings)
49

    
50
import logging
51
import time
52

    
53
import daemon.runner
54
import daemon.daemon
55
from lockfile import LockTimeout
56
from signal import signal, SIGINT, SIGTERM
57

    
58
# Take care of differences between python-daemon versions.
59
try:
60
    from daemon import pidfile as pidlockfile
61
except:
62
    from daemon import pidlockfile
63

    
64
from synnefo.lib.amqp import AMQPClient
65
from synnefo.logic import callbacks
66
from synnefo.util.dictconfig import dictConfig
67

    
68

    
69
log = logging.getLogger()
70

    
71

    
72
# Queue names
73
QUEUES = []
74

    
75
# Queue bindings to exchanges
76
BINDINGS = []
77

    
78

    
79
class Dispatcher:
80
    debug = False
81
    client_promises = []
82

    
83
    def __init__(self, debug=False):
84
        self.debug = debug
85
        self._init()
86

    
87
    def wait(self):
88
        log.info("Waiting for messages..")
89
        while True:
90
            try:
91
                self.client.basic_wait()
92
            except SystemExit:
93
                break
94
            except Exception as e:
95
                log.exception("Caught unexpected exception: %s", e)
96

    
97
        self.client.basic_cancel()
98
        self.client.close()
99

    
100
    def _init(self):
101
        global QUEUES, BINDINGS
102
        log.info("Initializing")
103

    
104
        self.client = AMQPClient()
105
        # Connect to AMQP host
106
        self.client.connect()
107

    
108
        # Declare queues and exchanges
109
        for exchange in settings.EXCHANGES:
110
            self.client.exchange_declare(exchange=exchange,
111
                                         type="topic")
112

    
113
        for queue in QUEUES:
114
            # Queues are mirrored to all RabbitMQ brokers
115
            self.client.queue_declare(queue=queue, mirrored=True)
116

    
117
        bindings = BINDINGS
118

    
119
        # Bind queues to handler methods
120
        for binding in bindings:
121
            try:
122
                callback = getattr(callbacks, binding[3])
123
            except AttributeError:
124
                log.error("Cannot find callback %s", binding[3])
125
                raise SystemExit(1)
126

    
127
            self.client.queue_bind(queue=binding[0], exchange=binding[1],
128
                                   routing_key=binding[2])
129

    
130
            consume_promise = self.client.basic_consume(queue=binding[0],
131
                                                        callback=callback)
132

    
133
            log.debug("Binding %s(%s) to queue %s with handler %s",
134
                      binding[1], binding[2], binding[0], binding[3])
135
            self.client_promises.append(consume_promise)
136

    
137

    
138
def _init_queues():
139
    global QUEUES, BINDINGS
140

    
141
    # Queue declarations
142
    prefix = settings.BACKEND_PREFIX_ID.split('-')[0]
143

    
144
    QUEUE_GANETI_EVENTS_OP = "%s-events-op" % prefix
145
    QUEUE_GANETI_EVENTS_NET = "%s-events-net" % prefix
146
    QUEUE_GANETI_BUILD_PROGR = "%s-events-progress" % prefix
147
    QUEUE_RECONC = "%s-reconciliation" % prefix
148
    if settings.DEBUG is True:
149
        QUEUE_DEBUG = "%s-debug" % prefix  # Debug queue, retrieves all messages
150

    
151
    QUEUES = (QUEUE_GANETI_EVENTS_OP, QUEUE_GANETI_EVENTS_NET, QUEUE_RECONC,
152
              QUEUE_GANETI_BUILD_PROGR)
153

    
154
    # notifications of type "ganeti-op-status"
155
    DB_HANDLER_KEY_OP = 'ganeti.%s.event.op' % prefix
156
    # notifications of type "ganeti-net-status"
157
    DB_HANDLER_KEY_NET = 'ganeti.%s.event.net' % prefix
158
    # notifications of type "ganeti-create-progress"
159
    BUILD_MONITOR_HANDLER = 'ganeti.%s.event.progress' % prefix
160
    # reconciliation
161
    RECONC_HANDLER = 'reconciliation.%s.*' % prefix
162

    
163
    BINDINGS = [
164
    # Queue                   # Exchange                # RouteKey              # Handler
165
    (QUEUE_GANETI_EVENTS_OP,  settings.EXCHANGE_GANETI, DB_HANDLER_KEY_OP,      'update_db'),
166
    (QUEUE_GANETI_EVENTS_NET, settings.EXCHANGE_GANETI, DB_HANDLER_KEY_NET,     'update_net'),
167
    (QUEUE_GANETI_BUILD_PROGR, settings.EXCHANGE_GANETI, BUILD_MONITOR_HANDLER,  'update_build_progress'),
168
    ]
169

    
170
    if settings.DEBUG is True:
171
        BINDINGS += [
172
            # Queue       # Exchange          # RouteKey  # Handler
173
            (QUEUE_DEBUG, settings.EXCHANGE_GANETI, '#',  'dummy_proc'),
174
            (QUEUE_DEBUG, settings.EXCHANGE_CRON,   '#',  'dummy_proc'),
175
            (QUEUE_DEBUG, settings.EXCHANGE_API,    '#',  'dummy_proc'),
176
        ]
177
        QUEUES += (QUEUE_DEBUG,)
178

    
179

    
180
def _exit_handler(signum, frame):
181
    """"Catch exit signal in children processes"""
182
    log.info("Caught signal %d, will raise SystemExit", signum)
183
    raise SystemExit
184

    
185

    
186
def _parent_handler(signum, frame):
187
    """"Catch exit signal in parent process and forward it to children."""
188
    global children
189
    log.info("Caught signal %d, sending SIGTERM to children %s",
190
             signum, children)
191
    [os.kill(pid, SIGTERM) for pid in children]
192

    
193

    
194
def child(cmdline):
195
    """The context of the child process"""
196

    
197
    # Cmd line argument parsing
198
    (opts, args) = parse_arguments(cmdline)
199
    disp = Dispatcher(debug=opts.debug)
200

    
201
    # Start the event loop
202
    disp.wait()
203

    
204

    
205
def parse_arguments(args):
206
    from optparse import OptionParser
207

    
208
    default_pid_file = os.path.join("var", "run", "synnefo", "dispatcher.pid")
209
    parser = OptionParser()
210
    parser.add_option("-d", "--debug", action="store_true", default=False,
211
                      dest="debug", help="Enable debug mode")
212
    parser.add_option("-w", "--workers", default=2, dest="workers",
213
                      help="Number of workers to spawn", type="int")
214
    parser.add_option("-p", "--pid-file", dest="pid_file",
215
                      default=default_pid_file,
216
                      help="Save PID to file (default: %s)" % default_pid_file)
217
    parser.add_option("--purge-queues", action="store_true",
218
                      default=False, dest="purge_queues",
219
                      help="Remove all declared queues (DANGEROUS!)")
220
    parser.add_option("--purge-exchanges", action="store_true",
221
                      default=False, dest="purge_exchanges",
222
                      help="Remove all exchanges. Implies deleting all queues \
223
                           first (DANGEROUS!)")
224
    parser.add_option("--drain-queue", dest="drain_queue",
225
                      help="Strips a queue from all outstanding messages")
226

    
227
    return parser.parse_args(args)
228

    
229

    
230
def purge_queues():
231
    """
232
        Delete declared queues from RabbitMQ. Use with care!
233
    """
234
    global QUEUES, BINDINGS
235
    client = AMQPClient()
236
    client.connect()
237

    
238
    print "Queues to be deleted: ", QUEUES
239

    
240
    if not get_user_confirmation():
241
        return
242

    
243
    for queue in QUEUES:
244
        result = client.queue_delete(queue=queue)
245
        print "Deleting queue %s. Result: %s" % (queue, result)
246

    
247
    client.close()
248

    
249

    
250
def purge_exchanges():
251
    """Delete declared exchanges from RabbitMQ, after removing all queues"""
252
    global QUEUES, BINDINGS
253
    purge_queues()
254

    
255
    client = AMQPClient()
256
    client.connect()
257

    
258
    print "Exchanges to be deleted: ", settings.EXCHANGES
259

    
260
    if not get_user_confirmation():
261
        return
262

    
263
    for exchange in settings.EXCHANGES:
264
        result = client.exchange_delete(exchange=exchange)
265
        print "Deleting exchange %s. Result: %s" % (exchange, result)
266

    
267
    client.close()
268

    
269

    
270
def drain_queue(queue):
271
    """Strip a (declared) queue from all outstanding messages"""
272
    global QUEUES, BINDINGS
273
    if not queue:
274
        return
275

    
276
    if not queue in QUEUES:
277
        print "Queue %s not configured" % queue
278
        return
279

    
280
    print "Queue to be drained: %s" % queue
281

    
282
    if not get_user_confirmation():
283
        return
284

    
285
    client = AMQPClient()
286
    client.connect()
287

    
288
    # Register a temporary queue binding
289
    for binding in BINDINGS:
290
        if binding[0] == queue:
291
            exch = binding[1]
292

    
293
    tag = client.basic_consume(queue=queue, callback=callbacks.dummy_proc)
294

    
295
    print "Queue draining about to start, hit Ctrl+c when done"
296
    time.sleep(2)
297
    print "Queue draining starting"
298

    
299
    signal(SIGTERM, _exit_handler)
300
    signal(SIGINT, _exit_handler)
301

    
302
    num_processed = 0
303
    while True:
304
        client.basic_wait()
305
        num_processed += 1
306
        sys.stderr.write("Ignored %d messages\r" % num_processed)
307

    
308
    client.basic_cancel(tag)
309
    client.close()
310

    
311

    
312

    
313
def get_user_confirmation():
314
    ans = raw_input("Are you sure (N/y):")
315

    
316
    if not ans:
317
        return False
318
    if ans not in ['Y', 'y']:
319
        return False
320
    return True
321

    
322

    
323
def debug_mode():
324
    disp = Dispatcher(debug=True)
325
    signal(SIGINT, _exit_handler)
326
    signal(SIGTERM, _exit_handler)
327

    
328
    disp.wait()
329

    
330

    
331
def daemon_mode(opts):
332
    global children
333

    
334
    # Create pidfile,
335
    pidf = pidlockfile.TimeoutPIDLockFile(opts.pid_file, 10)
336

    
337
    if daemon.runner.is_pidfile_stale(pidf):
338
        log.warning("Removing stale PID lock file %s", pidf.path)
339
        pidf.break_lock()
340

    
341
    try:
342
        pidf.acquire()
343
    except (pidlockfile.AlreadyLocked, LockTimeout):
344
        log.critical("Failed to lock pidfile %s, another instance running?",
345
                     pidf.path)
346
        sys.exit(1)
347

    
348
    log.info("Became a daemon")
349

    
350
    # Fork workers
351
    children = []
352

    
353
    i = 0
354
    while i < opts.workers:
355
        newpid = os.fork()
356

    
357
        if newpid == 0:
358
            signal(SIGINT, _exit_handler)
359
            signal(SIGTERM, _exit_handler)
360
            child(sys.argv[1:])
361
            sys.exit(1)
362
        else:
363
            log.debug("%d, forked child: %d", os.getpid(), newpid)
364
            children.append(newpid)
365
        i += 1
366

    
367
    # Catch signals to ensure graceful shutdown
368
    signal(SIGINT, _parent_handler)
369
    signal(SIGTERM, _parent_handler)
370

    
371
    # Wait for all children processes to die, one by one
372
    try:
373
        for pid in children:
374
            try:
375
                os.waitpid(pid, 0)
376
            except Exception:
377
                pass
378
    finally:
379
        pidf.release()
380

    
381

    
382
def main():
383
    (opts, args) = parse_arguments(sys.argv[1:])
384

    
385
    dictConfig(settings.DISPATCHER_LOGGING)
386

    
387
    global log
388

    
389
    # Init the global variables containing the queues
390
    _init_queues()
391

    
392
    # Special case for the clean up queues action
393
    if opts.purge_queues:
394
        purge_queues()
395
        return
396

    
397
    # Special case for the clean up exch action
398
    if opts.purge_exchanges:
399
        purge_exchanges()
400
        return
401

    
402
    if opts.drain_queue:
403
        drain_queue(opts.drain_queue)
404
        return
405

    
406
    # Debug mode, process messages without spawning workers
407
    if opts.debug:
408
        debug_mode()
409
        return
410

    
411
    files_preserve = []
412
    for handler in log.handlers:
413
        stream = getattr(handler, 'stream')
414
        if stream and hasattr(stream, 'fileno'):
415
            files_preserve.append(handler.stream)
416

    
417
    daemon_context = daemon.daemon.DaemonContext(
418
        files_preserve=files_preserve,
419
        umask=022)
420

    
421
    daemon_context.open()
422

    
423
    # Catch every exception, make sure it gets logged properly
424
    try:
425
        daemon_mode(opts)
426
    except Exception:
427
        log.exception("Unknown error")
428
        raise
429

    
430

    
431
if __name__ == "__main__":
432
    sys.exit(main())
433

    
434
# vim: set sta sts=4 shiftwidth=4 sw=4 et ai :