Statistics
| Branch: | Tag: | Revision:

root / logic / dispatcher.py @ 979482ce

History | View | Annotate | Download (11.1 kB)

1
#!/usr/bin/env python
2
# Copyright 2011 GRNET S.A. All rights reserved.
3
#
4
# Redistribution and use in source and binary forms, with or without
5
# modification, are permitted provided that the following conditions
6
# are met:
7
#
8
#   1. Redistributions of source code must retain the above copyright
9
#      notice, this list of conditions and the following disclaimer.
10
#
11
#  2. Redistributions in binary form must reproduce the above copyright
12
#     notice, this list of conditions and the following disclaimer in the
13
#     documentation and/or other materials provided with the distribution.
14
#
15
# THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
16
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18
# ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
19
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
20
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
21
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
22
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
23
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
24
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
25
# SUCH DAMAGE.
26
#
27
# The views and conclusions contained in the software and documentation are
28
# those of the authors and should not be interpreted as representing official
29
# policies, either expressed or implied, of GRNET S.A.
30

    
31

    
32
""" Message queue setup and dispatch
33

34
This program sets up connections to the queues configured in settings.py
35
and implements the message wait and dispatch loops. Actual messages are
36
handled in the dispatched functions.
37

38
"""
39

    
40
from django.core.management import setup_environ
41

    
42
import sys
43
import os
44
path = os.path.normpath(os.path.join(os.getcwd(), '..'))
45
sys.path.append(path)
46
import synnefo.settings as settings
47
from synnefo.logic import log
48

    
49
setup_environ(settings)
50

    
51
from amqplib import client_0_8 as amqp
52
from signal import signal, SIGINT, SIGTERM
53

    
54
import time
55
import socket
56
from daemon import daemon
57

    
58
# Take care of differences between python-daemon versions.
59
try:
60
    from daemon import pidfile
61
except:
62
    from daemon import pidlockfile
63

    
64
from synnefo.logic import callbacks
65

    
66
class Dispatcher:
67

    
68
    logger = None
69
    chan = None
70
    debug = False
71
    clienttags = []
72

    
73
    def __init__(self, debug = False):
74
        
75
        # Initialize logger
76
        self.logger = log.get_logger('synnefo.dispatcher')
77

    
78
        self.debug = debug
79
        self._init()
80

    
81
    def wait(self):
82
        while True:
83
            try:
84
                self.chan.wait()
85
            except SystemExit:
86
                break
87
            except amqp.exceptions.AMQPConnectionException:
88
                self.logger.error("Server went away, reconnecting...")
89
                self._init()
90
            except socket.error:
91
                self.logger.error("Server went away, reconnecting...")
92
                self._init()
93

    
94
        [self.chan.basic_cancel(clienttag) for clienttag in self.clienttags]
95
        self.chan.connection.close()
96
        self.chan.close()
97

    
98
    def _init(self):
99
        self.logger.info("Initializing")
100
        
101
        # Connect to RabbitMQ
102
        conn = None
103
        while conn == None:
104
            self.logger.info("Attempting to connect to %s",
105
                             settings.RABBIT_HOST)
106
            try:
107
                conn = amqp.Connection(host=settings.RABBIT_HOST,
108
                                       userid=settings.RABBIT_USERNAME,
109
                                       password=settings.RABBIT_PASSWORD,
110
                                       virtual_host=settings.RABBIT_VHOST)
111
            except socket.error:
112
                time.sleep(1)
113

    
114
        self.logger.info("Connection succesful, opening channel")
115
        self.chan = conn.channel()
116

    
117
        # Declare queues and exchanges
118
        for exchange in settings.EXCHANGES:
119
            self.chan.exchange_declare(exchange=exchange, type="topic",
120
                                       durable=True, auto_delete=False)
121

    
122
        for queue in settings.QUEUES:
123
            self.chan.queue_declare(queue=queue, durable=True,
124
                                    exclusive=False, auto_delete=False)
125

    
126
        bindings = settings.BINDINGS
127

    
128
        # Special queue for debugging, should not appear in production
129
        if self.debug:
130
            self.chan.queue_declare(queue=settings.QUEUE_DEBUG, durable=True,
131
                                    exclusive=False, auto_delete=False)
132
            bindings += settings.BINDINGS_DEBUG
133

    
134
        # Bind queues to handler methods
135
        for binding in bindings:
136
            try:
137
                callback = getattr(callbacks, binding[3])
138
            except AttributeError:
139
                self.logger.error("Cannot find callback %s" % binding[3])
140
                continue
141

    
142
            self.chan.queue_bind(queue=binding[0], exchange=binding[1],
143
                                 routing_key=binding[2])
144
            tag = self.chan.basic_consume(queue=binding[0], callback=callback)
145
            self.logger.debug("Binding %s(%s) to queue %s with handler %s" %
146
                              (binding[1], binding[2], binding[0], binding[3]))
147
            self.clienttags.append(tag)
148

    
149

    
150
def _exit_handler(signum, frame):
151
    """"Catch exit signal in children processes."""
152
    print "%d: Caught signal %d, will raise SystemExit" % (os.getpid(), signum)
153
    raise SystemExit
154

    
155

    
156
def _parent_handler(signum, frame):
157
    """"Catch exit signal in parent process and forward it to children."""
158
    global children
159
    print "Caught signal %d, sending kill signal to children" % signum
160
    [os.kill(pid, SIGTERM) for pid in children]
161

    
162

    
163
def child(cmdline):
164
    """The context of the child process"""
165

    
166
    # Cmd line argument parsing
167
    (opts, args) = parse_arguments(cmdline)
168
    disp = Dispatcher(debug = opts.debug)
169

    
170
    # Start the event loop
171
    disp.wait()
172

    
173

    
174
def parse_arguments(args):
175
    from optparse import OptionParser
176

    
177
    parser = OptionParser()
178
    parser.add_option("-d", "--debug", action="store_true", default=False,
179
                      dest="debug", help="Enable debug mode")
180
    parser.add_option("--purge-queues", action="store_true",
181
                      default=False, dest="purge_queues",
182
                      help="Remove all declared queues (DANGEROUS!)")
183
    parser.add_option("--purge-exchanges", action="store_true",
184
                      default=False, dest="purge_exchanges",
185
                      help="Remove all exchanges. Implies deleting all queues \
186
                           first (DANGEROUS!)")
187
    parser.add_option("--drain-queue", dest="clean_queue",
188
                      help="Acks and removes all messages from a queue")
189
    parser.add_option("-w", "--workers", default=2, dest="workers",
190
                      help="Number of workers to spawn", type="int")
191
    parser.add_option("-p", '--pid-file', dest="pid_file",
192
                      default=os.path.join(os.getcwd(), "dispatcher.pid"),
193
                      help="Save PID to file (default:%s)" %
194
                           os.path.join(os.getcwd(), "dispatcher.pid"))
195

    
196
    return parser.parse_args(args)
197

    
198

    
199
def purge_queues() :
200
    """
201
        Delete declared queues from RabbitMQ. Use with care!
202
    """
203
    conn = get_connection()
204
    chan = conn.channel()
205

    
206
    print "Queues to be deleted: ",  settings.QUEUES
207

    
208
    if not get_user_confirmation():
209
        return
210

    
211
    for queue in settings.QUEUES:
212
        try:
213
            chan.queue_delete(queue=queue)
214
            print "Deleting queue %s" % queue
215
        except amqp.exceptions.AMQPChannelException as e:
216
            print e.amqp_reply_code, " ", e.amqp_reply_text
217
            chan = conn.channel()
218

    
219
    chan.connection.close()
220

    
221

    
222
def purge_exchanges():
223
    """
224
        Delete declared exchanges from RabbitMQ, after removing all queues first
225
    """
226
    purge_queues()
227

    
228
    conn = get_connection()
229
    chan = conn.channel()
230

    
231
    print "Exchnages to be deleted: ", settings.EXCHANGES
232

    
233
    if not get_user_confirmation():
234
        return
235

    
236
    for exchange in settings.EXCHANGES:
237
        try:
238
            chan.exchange_delete(exchange=exchange)
239
        except amqp.exceptions.AMQPChannelException as e:
240
            print e.amqp_reply_code, " ", e.amqp_reply_text
241

    
242
    chan.connection.close()
243

    
244

    
245
def drain_queue(queue):
246
    """
247
        Strip a (declared) queue from all outstanding messages
248
    """
249
    if not queue:
250
        return
251

    
252
    if not queue in settings.QUEUES:
253
        print "Queue %s not configured" % queue
254
        return
255

    
256
    print "Queue to be drained: %s" % queue
257

    
258
    if not get_user_confirmation():
259
        return
260
    conn = get_connection()
261
    chan = conn.channel()
262

    
263
    chan.connection.close()
264

    
265
def get_connection():
266
    conn = amqp.Connection( host=settings.RABBIT_HOST,
267
                        userid=settings.RABBIT_USERNAME,
268
                        password=settings.RABBIT_PASSWORD,
269
                        virtual_host=settings.RABBIT_VHOST)
270
    return conn
271

    
272
def get_user_confirmation():
273
    ans = raw_input("Are you sure (N/y):")
274

    
275
    if not ans:
276
        return False
277
    if ans not in ['Y', 'y']:
278
        return False
279
    return True
280

    
281

    
282
def debug_mode():
283
    disp = Dispatcher(debug = True)
284
    signal(SIGINT, _exit_handler)
285
    signal(SIGTERM, _exit_handler)
286

    
287
    disp.wait()
288

    
289

    
290
def main():
291
    global children, logger
292
    (opts, args) = parse_arguments(sys.argv[1:])
293

    
294
    logger = log.get_logger("synnefo.dispatcher")
295

    
296
    # Special case for the clean up queues action
297
    if opts.purge_queues:
298
        purge_queues()
299
        return
300

    
301
    # Special case for the clean up exch action
302
    if opts.purge_exchanges:
303
        purge_exchanges()
304
        return
305

    
306
    if opts.clean_queue:
307
        drain_queue(opts.clean_queue)
308
        return
309

    
310
    # Debug mode, process messages without spawning workers
311
    if opts.debug:
312
        debug_mode()
313
        return
314

    
315
    # Become a daemon
316
    daemon_context = daemon.DaemonContext(
317
        stdout=sys.stdout,
318
        stderr=sys.stderr,
319
        umask=022)
320

    
321
    daemon_context.open()
322

    
323
    # Create pidfile. Take care of differences between python-daemon versions.
324
    try:
325
        pidf = pidfile.TimeoutPIDLockFile(opts.pid_file, 10)
326
    except:
327
        pidf = pidlockfile.TimeoutPIDLockFile(opts.pid_file, 10)
328

    
329
    pidf.acquire()
330

    
331
    logger.info("Became a daemon")
332

    
333
    # Fork workers
334
    children = []
335

    
336
    i = 0
337
    while i < opts.workers:
338
        newpid = os.fork()
339

    
340
        if newpid == 0:
341
            signal(SIGINT,  _exit_handler)
342
            signal(SIGTERM, _exit_handler)
343
            child(sys.argv[1:])
344
            sys.exit(1)
345
        else:
346
            pids = (os.getpid(), newpid)
347
            logger.debug("%d, forked child: %d" % pids)
348
            children.append(pids[1])
349
        i += 1
350

    
351
    # Catch signals to ensure graceful shutdown
352
    signal(SIGINT,  _parent_handler)
353
    signal(SIGTERM, _parent_handler)
354

    
355
    # Wait for all children processes to die, one by one
356
    try :
357
        for pid in children:
358
            try:
359
                os.waitpid(pid, 0)
360
            except Exception:
361
                pass
362
    finally:
363
        pidf.release()
364

    
365
if __name__ == "__main__":
366
    sys.exit(main())
367

    
368
# vim: set sta sts=4 shiftwidth=4 sw=4 et ai :