Statistics
| Branch: | Tag: | Revision:

root / logic / dispatcher.py @ c626e1d0

History | View | Annotate | Download (11.8 kB)

1
#!/usr/bin/env python
2
# Copyright 2011 GRNET S.A. All rights reserved.
3
#
4
# Redistribution and use in source and binary forms, with or without
5
# modification, are permitted provided that the following conditions
6
# are met:
7
#
8
#   1. Redistributions of source code must retain the above copyright
9
#      notice, this list of conditions and the following disclaimer.
10
#
11
#  2. Redistributions in binary form must reproduce the above copyright
12
#     notice, this list of conditions and the following disclaimer in the
13
#     documentation and/or other materials provided with the distribution.
14
#
15
# THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
16
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18
# ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
19
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
20
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
21
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
22
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
23
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
24
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
25
# SUCH DAMAGE.
26
#
27
# The views and conclusions contained in the software and documentation are
28
# those of the authors and should not be interpreted as representing official
29
# policies, either expressed or implied, of GRNET S.A.
30

    
31

    
32
""" Message queue setup, dispatch and admin
33

34
This program sets up connections to the queues configured in settings.py
35
and implements the message wait and dispatch loops. Actual messages are
36
handled in the dispatched functions.
37

38
"""
39
from django.core.management import setup_environ
40

    
41
import sys
42
import os
43
path = os.path.normpath(os.path.join(os.getcwd(), '..'))
44
sys.path.append(path)
45
import synnefo.settings as settings
46
from synnefo.logic import log
47

    
48
setup_environ(settings)
49

    
50
from amqplib import client_0_8 as amqp
51
from signal import signal, SIGINT, SIGTERM
52

    
53
import time
54
import socket
55
from daemon import daemon
56

    
57
# Take care of differences between python-daemon versions.
58
try:
59
    from daemon import pidfile
60
except:
61
    from daemon import pidlockfile
62

    
63
from synnefo.logic import callbacks
64

    
65
class Dispatcher:
66

    
67
    logger = None
68
    chan = None
69
    debug = False
70
    clienttags = []
71

    
72
    def __init__(self, debug = False):
73
        
74
        # Initialize logger
75
        self.logger = log.get_logger('synnefo.dispatcher')
76

    
77
        self.debug = debug
78
        self._init()
79

    
80
    def wait(self):
81
        while True:
82
            try:
83
                self.chan.wait()
84
            except SystemExit:
85
                break
86
            except amqp.exceptions.AMQPConnectionException:
87
                self.logger.error("Server went away, reconnecting...")
88
                self._init()
89
            except socket.error:
90
                self.logger.error("Server went away, reconnecting...")
91
                self._init()
92

    
93
        [self.chan.basic_cancel(clienttag) for clienttag in self.clienttags]
94
        self.chan.connection.close()
95
        self.chan.close()
96

    
97
    def _init(self):
98
        self.logger.info("Initializing")
99
        
100
        # Connect to RabbitMQ
101
        conn = None
102
        while conn == None:
103
            self.logger.info("Attempting to connect to %s",
104
                             settings.RABBIT_HOST)
105
            try:
106
                conn = amqp.Connection(host=settings.RABBIT_HOST,
107
                                       userid=settings.RABBIT_USERNAME,
108
                                       password=settings.RABBIT_PASSWORD,
109
                                       virtual_host=settings.RABBIT_VHOST)
110
            except socket.error:
111
                time.sleep(1)
112

    
113
        self.logger.info("Connection succesful, opening channel")
114
        self.chan = conn.channel()
115

    
116
        # Declare queues and exchanges
117
        for exchange in settings.EXCHANGES:
118
            self.chan.exchange_declare(exchange=exchange, type="topic",
119
                                       durable=True, auto_delete=False)
120

    
121
        for queue in settings.QUEUES:
122
            self.chan.queue_declare(queue=queue, durable=True,
123
                                    exclusive=False, auto_delete=False)
124

    
125
        bindings = settings.BINDINGS
126

    
127
        # Special queue for debugging, should not appear in production
128
        if self.debug:
129
            self.chan.queue_declare(queue=settings.QUEUE_DEBUG, durable=True,
130
                                    exclusive=False, auto_delete=False)
131
            bindings += settings.BINDINGS_DEBUG
132

    
133
        # Bind queues to handler methods
134
        for binding in bindings:
135
            try:
136
                callback = getattr(callbacks, binding[3])
137
            except AttributeError:
138
                self.logger.error("Cannot find callback %s" % binding[3])
139
                continue
140

    
141
            self.chan.queue_bind(queue=binding[0], exchange=binding[1],
142
                                 routing_key=binding[2])
143
            tag = self.chan.basic_consume(queue=binding[0], callback=callback)
144
            self.logger.debug("Binding %s(%s) to queue %s with handler %s" %
145
                              (binding[1], binding[2], binding[0], binding[3]))
146
            self.clienttags.append(tag)
147

    
148

    
149
def _exit_handler(signum, frame):
150
    """"Catch exit signal in children processes."""
151
    print "%d: Caught signal %d, will raise SystemExit" % (os.getpid(), signum)
152
    raise SystemExit
153

    
154

    
155
def _parent_handler(signum, frame):
156
    """"Catch exit signal in parent process and forward it to children."""
157
    global children
158
    print "Caught signal %d, sending kill signal to children" % signum
159
    [os.kill(pid, SIGTERM) for pid in children]
160

    
161

    
162
def child(cmdline):
163
    """The context of the child process"""
164

    
165
    # Cmd line argument parsing
166
    (opts, args) = parse_arguments(cmdline)
167
    disp = Dispatcher(debug = opts.debug)
168

    
169
    # Start the event loop
170
    disp.wait()
171

    
172

    
173
def parse_arguments(args):
174
    from optparse import OptionParser
175

    
176
    parser = OptionParser()
177
    parser.add_option("-d", "--debug", action="store_true", default=False,
178
                      dest="debug", help="Enable debug mode")
179
    parser.add_option("-w", "--workers", default=2, dest="workers",
180
                      help="Number of workers to spawn", type="int")
181
    parser.add_option("-p", '--pid-file', dest="pid_file",
182
                      default=os.path.join(os.getcwd(), "dispatcher.pid"),
183
                      help="Save PID to file (default:%s)" %
184
                           os.path.join(os.getcwd(), "dispatcher.pid"))
185
    parser.add_option("--purge-queues", action="store_true",
186
                      default=False, dest="purge_queues",
187
                      help="Remove all declared queues (DANGEROUS!)")
188
    parser.add_option("--purge-exchanges", action="store_true",
189
                      default=False, dest="purge_exchanges",
190
                      help="Remove all exchanges. Implies deleting all queues \
191
                           first (DANGEROUS!)")
192
    parser.add_option("--drain-queue", dest="drain_queue",
193
                      help="Strips a queue from all outstanding messages")
194

    
195
    return parser.parse_args(args)
196

    
197

    
198
def purge_queues() :
199
    """
200
        Delete declared queues from RabbitMQ. Use with care!
201
    """
202
    conn = get_connection()
203
    chan = conn.channel()
204

    
205
    print "Queues to be deleted: ",  settings.QUEUES
206

    
207
    if not get_user_confirmation():
208
        return
209

    
210
    for queue in settings.QUEUES:
211
        try:
212
            chan.queue_delete(queue=queue)
213
            print "Deleting queue %s" % queue
214
        except amqp.exceptions.AMQPChannelException as e:
215
            print e.amqp_reply_code, " ", e.amqp_reply_text
216
            chan = conn.channel()
217

    
218
    chan.connection.close()
219

    
220

    
221
def purge_exchanges():
222
    """
223
        Delete declared exchanges from RabbitMQ, after removing all queues first
224
    """
225
    purge_queues()
226

    
227
    conn = get_connection()
228
    chan = conn.channel()
229

    
230
    print "Exchnages to be deleted: ", settings.EXCHANGES
231

    
232
    if not get_user_confirmation():
233
        return
234

    
235
    for exchange in settings.EXCHANGES:
236
        try:
237
            chan.exchange_delete(exchange=exchange)
238
        except amqp.exceptions.AMQPChannelException as e:
239
            print e.amqp_reply_code, " ", e.amqp_reply_text
240

    
241
    chan.connection.close()
242

    
243

    
244
def drain_queue(queue):
245
    """
246
        Strip a (declared) queue from all outstanding messages
247
    """
248
    if not queue:
249
        return
250

    
251
    if not queue in settings.QUEUES:
252
        print "Queue %s not configured" % queue
253
        return
254

    
255
    print "Queue to be drained: %s" % queue
256

    
257
    if not get_user_confirmation():
258
        return
259
    conn = get_connection()
260
    chan = conn.channel()
261

    
262
    # Register a temporary queue binding
263
    for binding in settings.BINDINGS:
264
        if binding[0] == queue:
265
            exch = binding[1]
266

    
267
    if not exch:
268
        print "Queue not bound to any exchange: %s" % queue
269
        return
270

    
271
    chan.queue_bind(queue=queue, exchange=exch,routing_key='#')
272
    tag = chan.basic_consume(queue=queue, callback=callbacks.dummy_proc)
273

    
274
    print "Queue draining about to start, hit Ctrl+c when done"
275
    time.sleep(2)
276
    print "Queue draining starting"
277

    
278
    signal(SIGTERM, _exit_handler)
279
    signal(SIGINT, _exit_handler)
280

    
281
    num_processed = 0
282
    while True:
283
        chan.wait()
284
        num_processed += 1
285
        sys.stderr.write("Ignored %d messages\r" % num_processed)
286

    
287
    chan.basic_cancel(tag)
288
    chan.connection.close()
289

    
290
def get_connection():
291
    conn = amqp.Connection( host=settings.RABBIT_HOST,
292
                        userid=settings.RABBIT_USERNAME,
293
                        password=settings.RABBIT_PASSWORD,
294
                        virtual_host=settings.RABBIT_VHOST)
295
    return conn
296

    
297
def get_user_confirmation():
298
    ans = raw_input("Are you sure (N/y):")
299

    
300
    if not ans:
301
        return False
302
    if ans not in ['Y', 'y']:
303
        return False
304
    return True
305

    
306

    
307
def debug_mode():
308
    disp = Dispatcher(debug = True)
309
    signal(SIGINT, _exit_handler)
310
    signal(SIGTERM, _exit_handler)
311

    
312
    disp.wait()
313

    
314

    
315
def main():
316
    global children, logger
317
    (opts, args) = parse_arguments(sys.argv[1:])
318

    
319
    logger = log.get_logger("synnefo.dispatcher")
320

    
321
    # Special case for the clean up queues action
322
    if opts.purge_queues:
323
        purge_queues()
324
        return
325

    
326
    # Special case for the clean up exch action
327
    if opts.purge_exchanges:
328
        purge_exchanges()
329
        return
330

    
331
    if opts.drain_queue:
332
        drain_queue(opts.drain_queue)
333
        return
334

    
335
    # Debug mode, process messages without spawning workers
336
    if opts.debug:
337
        debug_mode()
338
        return
339

    
340
    # Become a daemon
341
    daemon_context = daemon.DaemonContext(
342
        stdout=sys.stdout,
343
        stderr=sys.stderr,
344
        umask=022)
345

    
346
    daemon_context.open()
347

    
348
    # Create pidfile. Take care of differences between python-daemon versions.
349
    try:
350
        pidf = pidfile.TimeoutPIDLockFile(opts.pid_file, 10)
351
    except:
352
        pidf = pidlockfile.TimeoutPIDLockFile(opts.pid_file, 10)
353

    
354
    pidf.acquire()
355

    
356
    logger.info("Became a daemon")
357

    
358
    # Fork workers
359
    children = []
360

    
361
    i = 0
362
    while i < opts.workers:
363
        newpid = os.fork()
364

    
365
        if newpid == 0:
366
            signal(SIGINT,  _exit_handler)
367
            signal(SIGTERM, _exit_handler)
368
            child(sys.argv[1:])
369
            sys.exit(1)
370
        else:
371
            pids = (os.getpid(), newpid)
372
            logger.debug("%d, forked child: %d" % pids)
373
            children.append(pids[1])
374
        i += 1
375

    
376
    # Catch signals to ensure graceful shutdown
377
    signal(SIGINT,  _parent_handler)
378
    signal(SIGTERM, _parent_handler)
379

    
380
    # Wait for all children processes to die, one by one
381
    try :
382
        for pid in children:
383
            try:
384
                os.waitpid(pid, 0)
385
            except Exception:
386
                pass
387
    finally:
388
        pidf.release()
389

    
390
if __name__ == "__main__":
391
    sys.exit(main())
392

    
393
# vim: set sta sts=4 shiftwidth=4 sw=4 et ai :