Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / dispatcher.py @ c4e55622

History | View | Annotate | Download (12.3 kB)

1
#!/usr/bin/env python
2
# Copyright 2011 GRNET S.A. All rights reserved.
3
#
4
# Redistribution and use in source and binary forms, with or without
5
# modification, are permitted provided that the following conditions
6
# are met:
7
#
8
#   1. Redistributions of source code must retain the above copyright
9
#      notice, this list of conditions and the following disclaimer.
10
#
11
#  2. Redistributions in binary form must reproduce the above copyright
12
#     notice, this list of conditions and the following disclaimer in the
13
#     documentation and/or other materials provided with the distribution.
14
#
15
# THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
16
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18
# ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
19
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
20
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
21
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
22
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
23
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
24
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
25
# SUCH DAMAGE.
26
#
27
# The views and conclusions contained in the software and documentation are
28
# those of the authors and should not be interpreted as representing official
29
# policies, either expressed or implied, of GRNET S.A.
30

    
31

    
32
""" Message queue setup, dispatch and admin
33

34
This program sets up connections to the queues configured in settings.py
35
and implements the message wait and dispatch loops. Actual messages are
36
handled in the dispatched functions.
37

38
"""
39
from django.core.management import setup_environ
40

    
41
import sys
42
import os
43
path = os.path.normpath(os.path.join(os.getcwd(), '..'))
44
sys.path.append(path)
45

    
46
from synnefo import settings
47
setup_environ(settings)
48

    
49
from amqplib import client_0_8 as amqp
50
from signal import signal, SIGINT, SIGTERM
51

    
52
import logging
53
import time
54
import socket
55
from daemon import daemon
56

    
57
from synnefo.lib.amqp import AMQPClient
58

    
59
# Take care of differences between python-daemon versions.
60
try:
61
    from daemon import pidfile
62
except:
63
    from daemon import pidlockfile
64

    
65
from synnefo.logic import callbacks
66
from synnefo.util.dictconfig import dictConfig
67

    
68

    
69
log = logging.getLogger()
70

    
71

    
72
# Queue names
73
QUEUES = []
74

    
75
# Queue bindings to exchanges
76
BINDINGS = []
77

    
78

    
79
class Dispatcher:
80
    debug = False
81
    client_promises = []
82

    
83
    def __init__(self, debug=False):
84
        self.debug = debug
85
        self._init()
86

    
87
    def wait(self):
88
        log.info("Waiting for messages..")
89
        while True:
90
            try:
91
                self.client.basic_wait()
92
            except SystemExit:
93
                break
94
            except Exception as e:
95
                log.exception("Caught unexpected exception: %s", e)
96

    
97
        self.client.basic_cancel()
98
        self.client.close()
99

    
100
    def _init(self):
101
        global QUEUES, BINDINGS
102
        log.info("Initializing")
103

    
104
        self.client = AMQPClient()
105
        # Connect to AMQP host
106
        self.client.connect()
107

    
108
        # Declare queues and exchanges
109
        for exchange in settings.EXCHANGES:
110
            self.client.exchange_declare(exchange_name=exchange,
111
                                         exchange_type="topic")
112

    
113
        for queue in QUEUES:
114
            # Queues are mirrored to all RabbitMQ brokers
115
            self.client.queue_declare(queue=queue,mirrored=True)
116

    
117
        bindings = BINDINGS
118

    
119
        # Bind queues to handler methods
120
        for binding in bindings:
121
            try:
122
                callback = getattr(callbacks, binding[3])
123
            except AttributeError:
124
                log.error("Cannot find callback %s", binding[3])
125
                raise SystemExit(1)
126

    
127
            self.client.queue_bind(queue=binding[0], exchange=binding[1],
128
                                   routing_key=binding[2])
129

    
130
            consume_promise = self.client.basic_consume(queue=binding[0],
131
                                                        callback=callback)
132

    
133
            log.debug("Binding %s(%s) to queue %s with handler %s",
134
                      binding[1], binding[2], binding[0], binding[3])
135
            self.client_promises.append(consume_promise)
136

    
137

    
138
def _init_queues():
139
    global QUEUES, BINDINGS
140

    
141
    # Queue declarations
142
    prefix = settings.BACKEND_PREFIX_ID.split('-')[0]
143

    
144
    QUEUE_GANETI_EVENTS_OP = "%s-events-op" % prefix
145
    QUEUE_GANETI_EVENTS_NET = "%s-events-net" % prefix
146
    QUEUE_GANETI_BUILD_PROGR = "%s-events-progress" % prefix
147
    QUEUE_RECONC = "%s-reconciliation" % prefix
148
    if settings.DEBUG is True:
149
        QUEUE_DEBUG = "debug"       # Debug queue, retrieves all messages
150

    
151
    QUEUES = (QUEUE_GANETI_EVENTS_OP, QUEUE_GANETI_EVENTS_NET, QUEUE_RECONC,
152
              QUEUE_GANETI_BUILD_PROGR)
153

    
154
    # notifications of type "ganeti-op-status"
155
    DB_HANDLER_KEY_OP = 'ganeti.%s.event.op' % prefix
156
    # notifications of type "ganeti-net-status"
157
    DB_HANDLER_KEY_NET = 'ganeti.%s.event.net' % prefix
158
    # notifications of type "ganeti-create-progress"
159
    BUILD_MONITOR_HANDLER = 'ganeti.%s.event.progress' % prefix
160
    # reconciliation
161
    RECONC_HANDLER = 'reconciliation.%s.*' % prefix
162

    
163
    BINDINGS = [
164
    # Queue                   # Exchange                # RouteKey              # Handler
165
    (QUEUE_GANETI_EVENTS_OP,  settings.EXCHANGE_GANETI, DB_HANDLER_KEY_OP,      'update_db'),
166
    (QUEUE_GANETI_EVENTS_NET, settings.EXCHANGE_GANETI, DB_HANDLER_KEY_NET,     'update_net'),
167
    (QUEUE_GANETI_BUILD_PROGR,settings.EXCHANGE_GANETI, BUILD_MONITOR_HANDLER,  'update_build_progress'),
168
    ]
169

    
170
    if settings.DEBUG is True:
171
        BINDINGS += [
172
            # Queue       # Exchange          # RouteKey  # Handler
173
            (QUEUE_DEBUG, settings.EXCHANGE_GANETI, '#',  'dummy_proc'),
174
            (QUEUE_DEBUG, settings.EXCHANGE_CRON,   '#',  'dummy_proc'),
175
            (QUEUE_DEBUG, settings.EXCHANGE_API,    '#',  'dummy_proc'),
176
        ]
177
        QUEUES += (QUEUE_DEBUG,)
178

    
179

    
180
def _exit_handler(signum, frame):
181
    """"Catch exit signal in children processes"""
182
    log.info("Caught signal %d, will raise SystemExit", signum)
183
    raise SystemExit
184

    
185

    
186
def _parent_handler(signum, frame):
187
    """"Catch exit signal in parent process and forward it to children."""
188
    global children
189
    log.info("Caught signal %d, sending SIGTERM to children %s",
190
                signum, children)
191
    [os.kill(pid, SIGTERM) for pid in children]
192

    
193

    
194
def child(cmdline):
195
    """The context of the child process"""
196

    
197
    # Cmd line argument parsing
198
    (opts, args) = parse_arguments(cmdline)
199
    disp = Dispatcher(debug=opts.debug)
200

    
201
    # Start the event loop
202
    disp.wait()
203

    
204

    
205
def parse_arguments(args):
206
    from optparse import OptionParser
207

    
208
    default_pid_file = os.path.join("var","run","synnefo","dispatcher.pid")
209
    parser = OptionParser()
210
    parser.add_option("-d", "--debug", action="store_true", default=False,
211
                      dest="debug", help="Enable debug mode")
212
    parser.add_option("-w", "--workers", default=2, dest="workers",
213
                      help="Number of workers to spawn", type="int")
214
    parser.add_option("-p", "--pid-file", dest="pid_file",
215
                      default=default_pid_file,
216
                      help="Save PID to file (default: %s)" % default_pid_file)
217
    parser.add_option("--purge-queues", action="store_true",
218
                      default=False, dest="purge_queues",
219
                      help="Remove all declared queues (DANGEROUS!)")
220
    parser.add_option("--purge-exchanges", action="store_true",
221
                      default=False, dest="purge_exchanges",
222
                      help="Remove all exchanges. Implies deleting all queues \
223
                           first (DANGEROUS!)")
224
    parser.add_option("--drain-queue", dest="drain_queue",
225
                      help="Strips a queue from all outstanding messages")
226

    
227
    return parser.parse_args(args)
228

    
229

    
230
def purge_queues():
231
    """
232
        Delete declared queues from RabbitMQ. Use with care!
233
    """
234
    global QUEUES, BINDINGS
235
    client = AMQPClient()
236
    client.connect()
237

    
238
    print "Queues to be deleted: ", QUEUES
239

    
240
    if not get_user_confirmation():
241
        return
242

    
243
    for queue in QUEUES:
244
        result = client.queue_delete(queue=queue)
245
        print "Deleting queue %s. Result: %s" % (queue, result)
246

    
247
    client.close()
248

    
249

    
250
def purge_exchanges():
251
    """Delete declared exchanges from RabbitMQ, after removing all queues"""
252
    global QUEUES, BINDINGS
253
    purge_queues()
254

    
255
    client = AMQPClient()
256
    client.connect()
257

    
258
    print "Exchanges to be deleted: ", settings.EXCHANGES
259

    
260
    if not get_user_confirmation():
261
        return
262

    
263
    for exchange in settings.EXCHANGES:
264
        result = client.exchange_delete(exchange=exchange)
265
        print "Deleting exchange %s. Result: %s" % (exchange, result)
266

    
267
    client.close()
268

    
269

    
270
def drain_queue(queue):
271
    """Strip a (declared) queue from all outstanding messages"""
272
    global QUEUES, BINDINGS
273
    if not queue:
274
        return
275

    
276
    if not queue in QUEUES:
277
        print "Queue %s not configured" % queue
278
        return
279

    
280
    print "Queue to be drained: %s" % queue
281

    
282
    if not get_user_confirmation():
283
        return
284

    
285
    client = AMQPClient()
286
    client.connect()
287

    
288
    # Register a temporary queue binding
289
    for binding in BINDINGS:
290
        if binding[0] == queue:
291
            exch = binding[1]
292

    
293
    tag = client.basic_consume(queue=queue, callback=callbacks.dummy_proc)
294

    
295
    print "Queue draining about to start, hit Ctrl+c when done"
296
    time.sleep(2)
297
    print "Queue draining starting"
298

    
299
    signal(SIGTERM, _exit_handler)
300
    signal(SIGINT, _exit_handler)
301

    
302
    num_processed = 0
303
    while True:
304
        client.basic_wait()
305
        num_processed += 1
306
        sys.stderr.write("Ignored %d messages\r" % num_processed)
307

    
308
    client.basic_cancel(tag)
309
    client.close()
310

    
311

    
312

    
313
def get_user_confirmation():
314
    ans = raw_input("Are you sure (N/y):")
315

    
316
    if not ans:
317
        return False
318
    if ans not in ['Y', 'y']:
319
        return False
320
    return True
321

    
322

    
323
def debug_mode():
324
    disp = Dispatcher(debug=True)
325
    signal(SIGINT, _exit_handler)
326
    signal(SIGTERM, _exit_handler)
327

    
328
    disp.wait()
329

    
330

    
331
def daemon_mode(opts):
332
    global children
333

    
334
    # Create pidfile,
335
    # take care of differences between python-daemon versions
336
    try:
337
        pidf = pidfile.TimeoutPIDLockFile(opts.pid_file, 10)
338
    except:
339
        pidf = pidlockfile.TimeoutPIDLockFile(opts.pid_file, 10)
340

    
341
    pidf.acquire()
342

    
343
    log.info("Became a daemon")
344

    
345
    # Fork workers
346
    children = []
347

    
348
    i = 0
349
    while i < opts.workers:
350
        newpid = os.fork()
351

    
352
        if newpid == 0:
353
            signal(SIGINT, _exit_handler)
354
            signal(SIGTERM, _exit_handler)
355
            child(sys.argv[1:])
356
            sys.exit(1)
357
        else:
358
            log.debug("%d, forked child: %d", os.getpid(), newpid)
359
            children.append(newpid)
360
        i += 1
361

    
362
    # Catch signals to ensure graceful shutdown
363
    signal(SIGINT, _parent_handler)
364
    signal(SIGTERM, _parent_handler)
365

    
366
    # Wait for all children processes to die, one by one
367
    try:
368
        for pid in children:
369
            try:
370
                os.waitpid(pid, 0)
371
            except Exception:
372
                pass
373
    finally:
374
        pidf.release()
375

    
376

    
377
def main():
378
    dictConfig(settings.DISPATCHER_LOGGING)
379

    
380
    global log
381

    
382
    (opts, args) = parse_arguments(sys.argv[1:])
383

    
384
    # Init the global variables containing the queues
385
    _init_queues()
386

    
387
    # Special case for the clean up queues action
388
    if opts.purge_queues:
389
        purge_queues()
390
        return
391

    
392
    # Special case for the clean up exch action
393
    if opts.purge_exchanges:
394
        purge_exchanges()
395
        return
396

    
397
    if opts.drain_queue:
398
        drain_queue(opts.drain_queue)
399
        return
400

    
401
    # Debug mode, process messages without spawning workers
402
    if opts.debug:
403
        debug_mode()
404
        return
405

    
406
    files_preserve = []
407
    for handler in log.handlers:
408
        stream = getattr(handler, 'stream')
409
        if stream and hasattr(stream, 'fileno'):
410
            files_preserve.append(handler.stream)
411

    
412
    daemon_context = daemon.DaemonContext(
413
        files_preserve=files_preserve,
414
        umask=022)
415

    
416
    daemon_context.open()
417

    
418
    # Catch every exception, make sure it gets logged properly
419
    try:
420
        daemon_mode(opts)
421
    except Exception:
422
        log.exception("Unknown error")
423
        raise
424

    
425

    
426
if __name__ == "__main__":
427
    sys.exit(main())
428

    
429
# vim: set sta sts=4 shiftwidth=4 sw=4 et ai :