Statistics
| Branch: | Tag: | Revision:

root / logic / dispatcher.py @ e6209aa2

History | View | Annotate | Download (11.7 kB)

1
#!/usr/bin/env python
2
# Copyright 2011 GRNET S.A. All rights reserved.
3
#
4
# Redistribution and use in source and binary forms, with or without
5
# modification, are permitted provided that the following conditions
6
# are met:
7
#
8
#   1. Redistributions of source code must retain the above copyright
9
#      notice, this list of conditions and the following disclaimer.
10
#
11
#  2. Redistributions in binary form must reproduce the above copyright
12
#     notice, this list of conditions and the following disclaimer in the
13
#     documentation and/or other materials provided with the distribution.
14
#
15
# THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
16
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18
# ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
19
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
20
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
21
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
22
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
23
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
24
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
25
# SUCH DAMAGE.
26
#
27
# The views and conclusions contained in the software and documentation are
28
# those of the authors and should not be interpreted as representing official
29
# policies, either expressed or implied, of GRNET S.A.
30

    
31

    
32
""" Message queue setup, dispatch and admin
33

34
This program sets up connections to the queues configured in settings.py
35
and implements the message wait and dispatch loops. Actual messages are
36
handled in the dispatched functions.
37

38
"""
39

    
40
from django.core.management import setup_environ
41

    
42
import sys
43
import os
44
path = os.path.normpath(os.path.join(os.getcwd(), '..'))
45
sys.path.append(path)
46
import synnefo.settings as settings
47
from synnefo.logic import log
48

    
49
setup_environ(settings)
50

    
51
from amqplib import client_0_8 as amqp
52
from signal import signal, SIGINT, SIGTERM
53

    
54
import time
55
import socket
56
from daemon import daemon
57

    
58
# Take care of differences between python-daemon versions.
59
try:
60
    from daemon import pidfile
61
except:
62
    from daemon import pidlockfile
63

    
64
from synnefo.logic import callbacks
65

    
66
class Dispatcher:
67

    
68
    logger = None
69
    chan = None
70
    debug = False
71
    clienttags = []
72

    
73
    def __init__(self, debug = False):
74
        
75
        # Initialize logger
76
        self.logger = log.get_logger('synnefo.dispatcher')
77

    
78
        self.debug = debug
79
        self._init()
80

    
81
    def wait(self):
82
        while True:
83
            try:
84
                self.chan.wait()
85
            except SystemExit:
86
                break
87
            except amqp.exceptions.AMQPConnectionException:
88
                self.logger.error("Server went away, reconnecting...")
89
                self._init()
90
            except socket.error:
91
                self.logger.error("Server went away, reconnecting...")
92
                self._init()
93

    
94
        [self.chan.basic_cancel(clienttag) for clienttag in self.clienttags]
95
        self.chan.connection.close()
96
        self.chan.close()
97

    
98
    def _init(self):
99
        self.logger.info("Initializing")
100
        
101
        # Connect to RabbitMQ
102
        conn = None
103
        while conn == None:
104
            self.logger.info("Attempting to connect to %s",
105
                             settings.RABBIT_HOST)
106
            try:
107
                conn = amqp.Connection(host=settings.RABBIT_HOST,
108
                                       userid=settings.RABBIT_USERNAME,
109
                                       password=settings.RABBIT_PASSWORD,
110
                                       virtual_host=settings.RABBIT_VHOST)
111
            except socket.error:
112
                time.sleep(1)
113

    
114
        self.logger.info("Connection succesful, opening channel")
115
        self.chan = conn.channel()
116

    
117
        # Declare queues and exchanges
118
        for exchange in settings.EXCHANGES:
119
            self.chan.exchange_declare(exchange=exchange, type="topic",
120
                                       durable=True, auto_delete=False)
121

    
122
        for queue in settings.QUEUES:
123
            self.chan.queue_declare(queue=queue, durable=True,
124
                                    exclusive=False, auto_delete=False)
125

    
126
        bindings = settings.BINDINGS
127

    
128
        # Special queue for debugging, should not appear in production
129
        if self.debug:
130
            self.chan.queue_declare(queue=settings.QUEUE_DEBUG, durable=True,
131
                                    exclusive=False, auto_delete=False)
132
            bindings += settings.BINDINGS_DEBUG
133

    
134
        # Bind queues to handler methods
135
        for binding in bindings:
136
            try:
137
                callback = getattr(callbacks, binding[3])
138
            except AttributeError:
139
                self.logger.error("Cannot find callback %s" % binding[3])
140
                continue
141

    
142
            self.chan.queue_bind(queue=binding[0], exchange=binding[1],
143
                                 routing_key=binding[2])
144
            tag = self.chan.basic_consume(queue=binding[0], callback=callback)
145
            self.logger.debug("Binding %s(%s) to queue %s with handler %s" %
146
                              (binding[1], binding[2], binding[0], binding[3]))
147
            self.clienttags.append(tag)
148

    
149

    
150
def _exit_handler(signum, frame):
151
    """"Catch exit signal in children processes."""
152
    print "%d: Caught signal %d, will raise SystemExit" % (os.getpid(), signum)
153
    raise SystemExit
154

    
155

    
156
def _parent_handler(signum, frame):
157
    """"Catch exit signal in parent process and forward it to children."""
158
    global children
159
    print "Caught signal %d, sending kill signal to children" % signum
160
    [os.kill(pid, SIGTERM) for pid in children]
161

    
162

    
163
def child(cmdline):
164
    """The context of the child process"""
165

    
166
    # Cmd line argument parsing
167
    (opts, args) = parse_arguments(cmdline)
168
    disp = Dispatcher(debug = opts.debug)
169

    
170
    # Start the event loop
171
    disp.wait()
172

    
173

    
174
def parse_arguments(args):
175
    from optparse import OptionParser
176

    
177
    parser = OptionParser()
178
    parser.add_option("-d", "--debug", action="store_true", default=False,
179
                      dest="debug", help="Enable debug mode")
180
    parser.add_option("-w", "--workers", default=2, dest="workers",
181
                      help="Number of workers to spawn", type="int")
182
    parser.add_option("-p", '--pid-file', dest="pid_file",
183
                      default=os.path.join(os.getcwd(), "dispatcher.pid"),
184
                      help="Save PID to file (default:%s)" %
185
                           os.path.join(os.getcwd(), "dispatcher.pid"))
186
    parser.add_option("--purge-queues", action="store_true",
187
                      default=False, dest="purge_queues",
188
                      help="Remove all declared queues (DANGEROUS!)")
189
    parser.add_option("--purge-exchanges", action="store_true",
190
                      default=False, dest="purge_exchanges",
191
                      help="Remove all exchanges. Implies deleting all queues \
192
                           first (DANGEROUS!)")
193
    parser.add_option("--drain-queue", dest="queue",
194
                      help="Strips a queue from all outstanding messages")
195

    
196
    return parser.parse_args(args)
197

    
198

    
199
def purge_queues() :
200
    """
201
        Delete declared queues from RabbitMQ. Use with care!
202
    """
203
    conn = get_connection()
204
    chan = conn.channel()
205

    
206
    print "Queues to be deleted: ",  settings.QUEUES
207

    
208
    if not get_user_confirmation():
209
        return
210

    
211
    for queue in settings.QUEUES:
212
        try:
213
            chan.queue_delete(queue=queue)
214
            print "Deleting queue %s" % queue
215
        except amqp.exceptions.AMQPChannelException as e:
216
            print e.amqp_reply_code, " ", e.amqp_reply_text
217
            chan = conn.channel()
218

    
219
    chan.connection.close()
220

    
221

    
222
def purge_exchanges():
223
    """
224
        Delete declared exchanges from RabbitMQ, after removing all queues first
225
    """
226
    purge_queues()
227

    
228
    conn = get_connection()
229
    chan = conn.channel()
230

    
231
    print "Exchnages to be deleted: ", settings.EXCHANGES
232

    
233
    if not get_user_confirmation():
234
        return
235

    
236
    for exchange in settings.EXCHANGES:
237
        try:
238
            chan.exchange_delete(exchange=exchange)
239
        except amqp.exceptions.AMQPChannelException as e:
240
            print e.amqp_reply_code, " ", e.amqp_reply_text
241

    
242
    chan.connection.close()
243

    
244

    
245
def drain_queue(queue):
246
    """
247
        Strip a (declared) queue from all outstanding messages
248
    """
249
    if not queue:
250
        return
251

    
252
    if not queue in settings.QUEUES:
253
        print "Queue %s not configured" % queue
254
        return
255

    
256
    print "Queue to be drained: %s" % queue
257

    
258
    if not get_user_confirmation():
259
        return
260
    conn = get_connection()
261
    chan = conn.channel()
262

    
263
    # Register a temporary queue binding
264
    for binding in settings.BINDINGS:
265
        if binding[0] == queue:
266
            exch = binding[1]
267

    
268
    if not exch:
269
        print "Queue not bound to any exchange: %s" % queue
270
        return
271

    
272
    chan.queue_bind(queue=queue, exchange=exch,routing_key='#')
273
    tag = chan.basic_consume(queue=queue, callback=callbacks.dummy_proc)
274

    
275
    print "Queue draining about to start, hit Ctrl+c when done"
276
    time.sleep(2)
277
    print "Queue draining starting"
278

    
279
    signal(SIGTERM, _exit_handler)
280
    signal(SIGINT, _exit_handler)
281

    
282
    while True:
283
        chan.wait()
284
    chan.basic_cancel(tag)
285
    chan.connection.close()
286

    
287
def get_connection():
288
    conn = amqp.Connection( host=settings.RABBIT_HOST,
289
                        userid=settings.RABBIT_USERNAME,
290
                        password=settings.RABBIT_PASSWORD,
291
                        virtual_host=settings.RABBIT_VHOST)
292
    return conn
293

    
294
def get_user_confirmation():
295
    ans = raw_input("Are you sure (N/y):")
296

    
297
    if not ans:
298
        return False
299
    if ans not in ['Y', 'y']:
300
        return False
301
    return True
302

    
303

    
304
def debug_mode():
305
    disp = Dispatcher(debug = True)
306
    signal(SIGINT, _exit_handler)
307
    signal(SIGTERM, _exit_handler)
308

    
309
    disp.wait()
310

    
311

    
312
def main():
313
    global children, logger
314
    (opts, args) = parse_arguments(sys.argv[1:])
315

    
316
    logger = log.get_logger("synnefo.dispatcher")
317

    
318
    # Special case for the clean up queues action
319
    if opts.purge_queues:
320
        purge_queues()
321
        return
322

    
323
    # Special case for the clean up exch action
324
    if opts.purge_exchanges:
325
        purge_exchanges()
326
        return
327

    
328
    if opts.clean_queue:
329
        drain_queue(opts.queue)
330
        return
331

    
332
    # Debug mode, process messages without spawning workers
333
    if opts.debug:
334
        debug_mode()
335
        return
336

    
337
    # Become a daemon
338
    daemon_context = daemon.DaemonContext(
339
        stdout=sys.stdout,
340
        stderr=sys.stderr,
341
        umask=022)
342

    
343
    daemon_context.open()
344

    
345
    # Create pidfile. Take care of differences between python-daemon versions.
346
    try:
347
        pidf = pidfile.TimeoutPIDLockFile(opts.pid_file, 10)
348
    except:
349
        pidf = pidlockfile.TimeoutPIDLockFile(opts.pid_file, 10)
350

    
351
    pidf.acquire()
352

    
353
    logger.info("Became a daemon")
354

    
355
    # Fork workers
356
    children = []
357

    
358
    i = 0
359
    while i < opts.workers:
360
        newpid = os.fork()
361

    
362
        if newpid == 0:
363
            signal(SIGINT,  _exit_handler)
364
            signal(SIGTERM, _exit_handler)
365
            child(sys.argv[1:])
366
            sys.exit(1)
367
        else:
368
            pids = (os.getpid(), newpid)
369
            logger.debug("%d, forked child: %d" % pids)
370
            children.append(pids[1])
371
        i += 1
372

    
373
    # Catch signals to ensure graceful shutdown
374
    signal(SIGINT,  _parent_handler)
375
    signal(SIGTERM, _parent_handler)
376

    
377
    # Wait for all children processes to die, one by one
378
    try :
379
        for pid in children:
380
            try:
381
                os.waitpid(pid, 0)
382
            except Exception:
383
                pass
384
    finally:
385
        pidf.release()
386

    
387
if __name__ == "__main__":
388
    sys.exit(main())
389

    
390
# vim: set sta sts=4 shiftwidth=4 sw=4 et ai :