Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / dispatcher.py @ a17a8e98

History | View | Annotate | Download (12.9 kB)

1
#!/usr/bin/env python
2
# Copyright 2011 GRNET S.A. All rights reserved.
3
#
4
# Redistribution and use in source and binary forms, with or without
5
# modification, are permitted provided that the following conditions
6
# are met:
7
#
8
#   1. Redistributions of source code must retain the above copyright
9
#      notice, this list of conditions and the following disclaimer.
10
#
11
#  2. Redistributions in binary form must reproduce the above copyright
12
#     notice, this list of conditions and the following disclaimer in the
13
#     documentation and/or other materials provided with the distribution.
14
#
15
# THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
16
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18
# ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
19
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
20
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
21
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
22
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
23
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
24
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
25
# SUCH DAMAGE.
26
#
27
# The views and conclusions contained in the software and documentation are
28
# those of the authors and should not be interpreted as representing official
29
# policies, either expressed or implied, of GRNET S.A.
30

    
31

    
32
""" Message queue setup, dispatch and admin
33

34
This program sets up connections to the queues configured in settings.py
35
and implements the message wait and dispatch loops. Actual messages are
36
handled in the dispatched functions.
37

38
"""
39
from django.core.management import setup_environ
40

    
41
# Fix path to import synnefo settings
42
import sys
43
import os
44
path = os.path.normpath(os.path.join(os.getcwd(), '..'))
45
sys.path.append(path)
46

    
47
from synnefo import settings
48
setup_environ(settings)
49

    
50
import logging
51
import time
52

    
53
import daemon.runner
54
import daemon.daemon
55
from lockfile import LockTimeout
56
from signal import signal, SIGINT, SIGTERM
57

    
58
# Take care of differences between python-daemon versions.
59
try:
60
    from daemon import pidfile as pidlockfile
61
except:
62
    from daemon import pidlockfile
63

    
64
from synnefo.lib.amqp import AMQPClient
65
from synnefo.logic import callbacks
66
from synnefo.util.dictconfig import dictConfig
67

    
68

    
69
log = logging.getLogger()
70

    
71

    
72
# Queue names
73
QUEUES = []
74

    
75
# Queue bindings to exchanges
76
BINDINGS = []
77

    
78

    
79
class Dispatcher:
80
    debug = False
81
    client_promises = []
82

    
83
    def __init__(self, debug=False):
84
        self.debug = debug
85
        self._init()
86

    
87
    def wait(self):
88
        log.info("Waiting for messages..")
89
        while True:
90
            try:
91
                self.client.basic_wait()
92
            except SystemExit:
93
                break
94
            except Exception as e:
95
                log.exception("Caught unexpected exception: %s", e)
96

    
97
        self.client.basic_cancel()
98
        self.client.close()
99

    
100
    def _init(self):
101
        global QUEUES, BINDINGS
102
        log.info("Initializing")
103

    
104
        self.client = AMQPClient()
105
        # Connect to AMQP host
106
        self.client.connect()
107

    
108
        # Declare queues and exchanges
109
        for exchange in settings.EXCHANGES:
110
            self.client.exchange_declare(exchange=exchange,
111
                                         type="topic")
112

    
113
        for queue in QUEUES:
114
            # Queues are mirrored to all RabbitMQ brokers
115
            self.client.queue_declare(queue=queue, mirrored=True)
116

    
117
        bindings = BINDINGS
118

    
119
        # Bind queues to handler methods
120
        for binding in bindings:
121
            try:
122
                callback = getattr(callbacks, binding[3])
123
            except AttributeError:
124
                log.error("Cannot find callback %s", binding[3])
125
                raise SystemExit(1)
126

    
127
            self.client.queue_bind(queue=binding[0], exchange=binding[1],
128
                                   routing_key=binding[2])
129

    
130
            consume_promise = self.client.basic_consume(queue=binding[0],
131
                                                        callback=callback)
132

    
133
            log.debug("Binding %s(%s) to queue %s with handler %s",
134
                      binding[1], binding[2], binding[0], binding[3])
135
            self.client_promises.append(consume_promise)
136

    
137

    
138
def _init_queues():
139
    global QUEUES, BINDINGS
140

    
141
    # Queue declarations
142
    prefix = settings.BACKEND_PREFIX_ID.split('-')[0]
143

    
144
    QUEUE_GANETI_EVENTS_OP = "%s-events-op" % prefix
145
    QUEUE_GANETI_EVENTS_NETWORK = "%s-events-network" % prefix
146
    QUEUE_GANETI_EVENTS_NET = "%s-events-net" % prefix
147
    QUEUE_GANETI_BUILD_PROGR = "%s-events-progress" % prefix
148
    QUEUE_RECONC = "%s-reconciliation" % prefix
149
    if settings.DEBUG is True:
150
        QUEUE_DEBUG = "%s-debug" % prefix  # Debug queue, retrieves all messages
151

    
152
    QUEUES = (QUEUE_GANETI_EVENTS_OP, QUEUE_GANETI_EVENTS_NETWORK, QUEUE_GANETI_EVENTS_NET, QUEUE_RECONC,
153
              QUEUE_GANETI_BUILD_PROGR)
154

    
155
    # notifications of type "ganeti-op-status"
156
    DB_HANDLER_KEY_OP = 'ganeti.%s.event.op' % prefix
157
    # notifications of type "ganeti-network-status"
158
    DB_HANDLER_KEY_NETWORK = 'ganeti.%s.event.network' % prefix
159
    # notifications of type "ganeti-net-status"
160
    DB_HANDLER_KEY_NET = 'ganeti.%s.event.net' % prefix
161
    # notifications of type "ganeti-create-progress"
162
    BUILD_MONITOR_HANDLER = 'ganeti.%s.event.progress' % prefix
163
    # reconciliation
164
    RECONC_HANDLER = 'reconciliation.%s.*' % prefix
165

    
166
    BINDINGS = [
167
    # Queue                   # Exchange                # RouteKey              # Handler
168
    (QUEUE_GANETI_EVENTS_OP,  settings.EXCHANGE_GANETI, DB_HANDLER_KEY_OP,      'update_db'),
169
    (QUEUE_GANETI_EVENTS_NETWORK, settings.EXCHANGE_GANETI, DB_HANDLER_KEY_NETWORK, 'update_network'),
170
    (QUEUE_GANETI_EVENTS_NET, settings.EXCHANGE_GANETI, DB_HANDLER_KEY_NET,     'update_net'),
171
    (QUEUE_GANETI_BUILD_PROGR, settings.EXCHANGE_GANETI, BUILD_MONITOR_HANDLER,  'update_build_progress'),
172
    ]
173

    
174
    if settings.DEBUG is True:
175
        BINDINGS += [
176
            # Queue       # Exchange          # RouteKey  # Handler
177
            (QUEUE_DEBUG, settings.EXCHANGE_GANETI, '#',  'dummy_proc'),
178
            (QUEUE_DEBUG, settings.EXCHANGE_CRON,   '#',  'dummy_proc'),
179
            (QUEUE_DEBUG, settings.EXCHANGE_API,    '#',  'dummy_proc'),
180
        ]
181
        QUEUES += (QUEUE_DEBUG,)
182

    
183

    
184
def _exit_handler(signum, frame):
185
    """"Catch exit signal in children processes"""
186
    log.info("Caught signal %d, will raise SystemExit", signum)
187
    raise SystemExit
188

    
189

    
190
def _parent_handler(signum, frame):
191
    """"Catch exit signal in parent process and forward it to children."""
192
    global children
193
    log.info("Caught signal %d, sending SIGTERM to children %s",
194
             signum, children)
195
    [os.kill(pid, SIGTERM) for pid in children]
196

    
197

    
198
def child(cmdline):
199
    """The context of the child process"""
200

    
201
    # Cmd line argument parsing
202
    (opts, args) = parse_arguments(cmdline)
203
    disp = Dispatcher(debug=opts.debug)
204

    
205
    # Start the event loop
206
    disp.wait()
207

    
208

    
209
def parse_arguments(args):
210
    from optparse import OptionParser
211

    
212
    default_pid_file = os.path.join("var", "run", "synnefo", "dispatcher.pid")
213
    parser = OptionParser()
214
    parser.add_option("-d", "--debug", action="store_true", default=False,
215
                      dest="debug", help="Enable debug mode")
216
    parser.add_option("-w", "--workers", default=2, dest="workers",
217
                      help="Number of workers to spawn", type="int")
218
    parser.add_option("-p", "--pid-file", dest="pid_file",
219
                      default=default_pid_file,
220
                      help="Save PID to file (default: %s)" % default_pid_file)
221
    parser.add_option("--purge-queues", action="store_true",
222
                      default=False, dest="purge_queues",
223
                      help="Remove all declared queues (DANGEROUS!)")
224
    parser.add_option("--purge-exchanges", action="store_true",
225
                      default=False, dest="purge_exchanges",
226
                      help="Remove all exchanges. Implies deleting all queues \
227
                           first (DANGEROUS!)")
228
    parser.add_option("--drain-queue", dest="drain_queue",
229
                      help="Strips a queue from all outstanding messages")
230

    
231
    return parser.parse_args(args)
232

    
233

    
234
def purge_queues():
235
    """
236
        Delete declared queues from RabbitMQ. Use with care!
237
    """
238
    global QUEUES, BINDINGS
239
    client = AMQPClient()
240
    client.connect()
241

    
242
    print "Queues to be deleted: ", QUEUES
243

    
244
    if not get_user_confirmation():
245
        return
246

    
247
    for queue in QUEUES:
248
        result = client.queue_delete(queue=queue)
249
        print "Deleting queue %s. Result: %s" % (queue, result)
250

    
251
    client.close()
252

    
253

    
254
def purge_exchanges():
255
    """Delete declared exchanges from RabbitMQ, after removing all queues"""
256
    global QUEUES, BINDINGS
257
    purge_queues()
258

    
259
    client = AMQPClient()
260
    client.connect()
261

    
262
    print "Exchanges to be deleted: ", settings.EXCHANGES
263

    
264
    if not get_user_confirmation():
265
        return
266

    
267
    for exchange in settings.EXCHANGES:
268
        result = client.exchange_delete(exchange=exchange)
269
        print "Deleting exchange %s. Result: %s" % (exchange, result)
270

    
271
    client.close()
272

    
273

    
274
def drain_queue(queue):
275
    """Strip a (declared) queue from all outstanding messages"""
276
    global QUEUES, BINDINGS
277
    if not queue:
278
        return
279

    
280
    if not queue in QUEUES:
281
        print "Queue %s not configured" % queue
282
        return
283

    
284
    print "Queue to be drained: %s" % queue
285

    
286
    if not get_user_confirmation():
287
        return
288

    
289
    client = AMQPClient()
290
    client.connect()
291

    
292
    # Register a temporary queue binding
293
    for binding in BINDINGS:
294
        if binding[0] == queue:
295
            exch = binding[1]
296

    
297
    tag = client.basic_consume(queue=queue, callback=callbacks.dummy_proc)
298

    
299
    print "Queue draining about to start, hit Ctrl+c when done"
300
    time.sleep(2)
301
    print "Queue draining starting"
302

    
303
    signal(SIGTERM, _exit_handler)
304
    signal(SIGINT, _exit_handler)
305

    
306
    num_processed = 0
307
    while True:
308
        client.basic_wait()
309
        num_processed += 1
310
        sys.stderr.write("Ignored %d messages\r" % num_processed)
311

    
312
    client.basic_cancel(tag)
313
    client.close()
314

    
315

    
316

    
317
def get_user_confirmation():
318
    ans = raw_input("Are you sure (N/y):")
319

    
320
    if not ans:
321
        return False
322
    if ans not in ['Y', 'y']:
323
        return False
324
    return True
325

    
326

    
327
def debug_mode():
328
    disp = Dispatcher(debug=True)
329
    signal(SIGINT, _exit_handler)
330
    signal(SIGTERM, _exit_handler)
331

    
332
    disp.wait()
333

    
334

    
335
def daemon_mode(opts):
336
    global children
337

    
338
    # Create pidfile,
339
    pidf = pidlockfile.TimeoutPIDLockFile(opts.pid_file, 10)
340

    
341
    if daemon.runner.is_pidfile_stale(pidf):
342
        log.warning("Removing stale PID lock file %s", pidf.path)
343
        pidf.break_lock()
344

    
345
    try:
346
        pidf.acquire()
347
    except (pidlockfile.AlreadyLocked, LockTimeout):
348
        log.critical("Failed to lock pidfile %s, another instance running?",
349
                     pidf.path)
350
        sys.exit(1)
351

    
352
    log.info("Became a daemon")
353

    
354
    # Fork workers
355
    children = []
356

    
357
    i = 0
358
    while i < opts.workers:
359
        newpid = os.fork()
360

    
361
        if newpid == 0:
362
            signal(SIGINT, _exit_handler)
363
            signal(SIGTERM, _exit_handler)
364
            child(sys.argv[1:])
365
            sys.exit(1)
366
        else:
367
            log.debug("%d, forked child: %d", os.getpid(), newpid)
368
            children.append(newpid)
369
        i += 1
370

    
371
    # Catch signals to ensure graceful shutdown
372
    signal(SIGINT, _parent_handler)
373
    signal(SIGTERM, _parent_handler)
374

    
375
    # Wait for all children processes to die, one by one
376
    try:
377
        for pid in children:
378
            try:
379
                os.waitpid(pid, 0)
380
            except Exception:
381
                pass
382
    finally:
383
        pidf.release()
384

    
385

    
386
def main():
387
    (opts, args) = parse_arguments(sys.argv[1:])
388

    
389
    dictConfig(settings.DISPATCHER_LOGGING)
390

    
391
    global log
392

    
393
    # Init the global variables containing the queues
394
    _init_queues()
395

    
396
    # Special case for the clean up queues action
397
    if opts.purge_queues:
398
        purge_queues()
399
        return
400

    
401
    # Special case for the clean up exch action
402
    if opts.purge_exchanges:
403
        purge_exchanges()
404
        return
405

    
406
    if opts.drain_queue:
407
        drain_queue(opts.drain_queue)
408
        return
409

    
410
    # Debug mode, process messages without spawning workers
411
    if opts.debug:
412
        debug_mode()
413
        return
414

    
415
    files_preserve = []
416
    for handler in log.handlers:
417
        stream = getattr(handler, 'stream')
418
        if stream and hasattr(stream, 'fileno'):
419
            files_preserve.append(handler.stream)
420

    
421
    daemon_context = daemon.daemon.DaemonContext(
422
        files_preserve=files_preserve,
423
        umask=022)
424

    
425
    daemon_context.open()
426

    
427
    # Catch every exception, make sure it gets logged properly
428
    try:
429
        daemon_mode(opts)
430
    except Exception:
431
        log.exception("Unknown error")
432
        raise
433

    
434

    
435
if __name__ == "__main__":
436
    sys.exit(main())
437

    
438
# vim: set sta sts=4 shiftwidth=4 sw=4 et ai :