Statistics
| Branch: | Tag: | Revision:

root / logic / dispatcher.py @ 838239fa

History | View | Annotate | Download (7.8 kB)

1
#!/usr/bin/env python
2
#
3
# Copyright (c) 2011 Greek Research and Technology Network
4
#
5
""" Message queue setup and dispatch
6

7
This program sets up connections to the queues configured in settings.py
8
and implements the message wait and dispatch loops. Actual messages are
9
handled in the dispatched functions.
10

11
"""
12

    
13
from django.core.management import setup_environ
14

    
15
import sys
16
import os
17
path = os.path.normpath(os.path.join(os.getcwd(), '..'))
18
sys.path.append(path)
19
import synnefo.settings as settings
20

    
21
setup_environ(settings)
22

    
23
from amqplib import client_0_8 as amqp
24
from signal import signal, SIGINT, SIGTERM
25

    
26
import logging
27
import time
28
import socket
29

    
30
from synnefo.logic import dispatcher_callbacks
31

    
32

    
33
class Dispatcher:
34

    
35
    logger = None
36
    chan = None
37
    debug = False
38
    clienttags = []
39

    
40
    def __init__(self, debug = False, logger = None):
41
        self.logger = logger
42
        self.debug = debug
43
        self._init()
44

    
45
    def wait(self):
46
        while True:
47
            try:
48
                self.chan.wait()
49
            except SystemExit:
50
                break
51
            except socket.error:
52
                self.logger.error("Server went away, reconnecting...")
53
                self._init()
54

    
55
        [self.chan.basic_cancel(clienttag) for clienttag in self.clienttags]
56
        self.chan.connection.close()
57
        self.chan.close()
58
        sys.exit()
59

    
60
    def _init(self):
61
        # Connect to RabbitMQ
62
        conn = None
63
        while conn == None:
64
            self.logger.info("Attempting to connect to %s",
65
                             settings.RABBIT_HOST)
66
            try:
67
                conn = amqp.Connection(host=settings.RABBIT_HOST,
68
                                       userid=settings.RABBIT_USERNAME,
69
                                       password=settings.RABBIT_PASSWORD,
70
                                       virtual_host=settings.RABBIT_VHOST)
71
            except socket.error:
72
                time.sleep(1)
73

    
74
        self.logger.info("Connection succesful, opening channel")
75
        self.chan = conn.channel()
76

    
77
        # Declare queues and exchanges
78
        for exchange in settings.EXCHANGES:
79
            self.chan.exchange_declare(exchange=exchange, type="topic",
80
                                       durable=True, auto_delete=False)
81

    
82
        for queue in settings.QUEUES:
83
            self.chan.queue_declare(queue=queue, durable=True,
84
                                    exclusive=False, auto_delete=False)
85

    
86
        bindings = settings.BINDINGS
87

    
88
        # Special queue for debugging, should not appear in production
89
        if self.debug:
90
            self.chan.queue_declare(queue=settings.QUEUE_DEBUG, durable=True,
91
                                    exclusive=False, auto_delete=False)
92
            bindings += settings.BINDINGS_DEBUG
93

    
94
        # Bind queues to handler methods
95
        for binding in bindings:
96
            try:
97
                callback = getattr(dispatcher_callbacks, binding[3])
98
            except AttributeError:
99
                self.logger.error("Cannot find callback %s" % binding[3])
100
                continue
101

    
102
            self.chan.queue_bind(queue=binding[0], exchange=binding[1],
103
                                 routing_key=binding[2])
104
            tag = self.chan.basic_consume(queue=binding[0], callback=callback)
105
            self.logger.debug("Binding %s(%s) to queue %s with handler %s" %
106
                              (binding[1], binding[2], binding[0], binding[3]))
107
            self.clienttags.append(tag)
108

    
109

    
110
def _exit_handler(signum, frame):
111
    """"Catch exit signal in children processes."""
112
    print "%d: Caught signal %d, will raise SystemExit" % (os.getpid(), signum)
113
    raise SystemExit
114

    
115

    
116
def _parent_handler(signum, frame):
117
    """"Catch exit signal in parent process and forward it to children."""
118
    global children
119
    print "Caught signal %d, sending kill signal to children" % signum
120
    [os.kill(pid, SIGTERM) for pid in children]
121

    
122

    
123
def child(cmdline, logger):
124
    """The context of the child process"""
125

    
126
    # Cmd line argument parsing
127
    (opts, args) = parse_arguments(cmdline)
128
    disp = Dispatcher(debug = opts.debug, logger = logger)
129

    
130
    # Start the event loop
131
    disp.wait()
132

    
133

    
134
def parse_arguments(args):
135
    from optparse import OptionParser
136

    
137
    parser = OptionParser()
138
    parser.add_option("-d", "--debug", action="store_true", default=False,
139
                      dest="debug", help="Enable debug mode")
140
    parser.add_option("-l", "--log", dest="log_file",
141
                      default=settings.DISPATCHER_LOG_FILE, metavar="FILE",
142
                      help="Write log to FILE instead of %s" %
143
                           settings.DISPATCHER_LOG_FILE)
144
    parser.add_option("-c", "--cleanup-queues", action="store_true",
145
                      default=False, dest="cleanup_queues",
146
                      help="Remove all queues declared in settings.py (DANGEROUS!)")
147
    parser.add_option("-w", "--workers", default=2, dest="workers",
148
                      help="Number of workers to spawn", type="int")
149
    
150
    return parser.parse_args(args)
151

    
152

    
153
def cleanup_queues() :
154
    """Delete declared queues from RabbitMQ. Use with care!"""
155
    conn = amqp.Connection( host=settings.RABBIT_HOST,
156
                            userid=settings.RABBIT_USERNAME,
157
                            password=settings.RABBIT_PASSWORD,
158
                            virtual_host=settings.RABBIT_VHOST)
159
    chan = conn.channel()
160

    
161
    print "Queues to be deleted: ",  settings.QUEUES
162
    print "Exchnages to be deleted: ", settings.EXCHANGES
163
    ans = raw_input("Are you sure (N/y):")
164

    
165
    if not ans:
166
        return
167
    if ans not in ['Y', 'y']:
168
        return
169

    
170
    for exchange in settings.EXCHANGES:
171
        try:
172
            chan.exchange_delete(exchange=exchange)
173
        except amqp.exceptions.AMQPChannelException as e:
174
            print e.amqp_reply_code, " ", e.amqp_reply_text
175

    
176
    for queue in settings.QUEUES:
177
        try:
178
            chan.queue_delete(queue=queue)
179
        except amqp.exceptions.AMQPChannelException as e:
180
            print e.amqp_reply_code, " ", e.amqp_reply_text
181
    chan.close()
182
    chan.connection.close()
183

    
184

    
185
def debug_mode(logger):
186
    disp = Dispatcher(debug = True, logger = logger)
187
    signal(SIGINT, _exit_handler)
188
    signal(SIGTERM, _exit_handler)
189

    
190
    disp.wait()
191

    
192

    
193
def main():
194
    global children, logger
195
    (opts, args) = parse_arguments(sys.argv[1:])
196

    
197
    # Initialize logger
198
    lvl = logging.DEBUG if opts.debug else logging.INFO
199
    logger = logging.getLogger("synnefo.dispatcher")
200
    logger.setLevel(lvl)
201
    formatter = logging.Formatter(
202
        "%(asctime)s %(module)s[%(process)d] %(levelname)s: %(message)s",
203
        "%Y-%m-%d %H:%M:%S")
204
    handler = logging.FileHandler(opts.log_file)
205
    handler.setFormatter(formatter)
206
    logger.addHandler(handler)
207

    
208
    # Special case for the clean up queues action
209
    if opts.cleanup_queues:
210
        cleanup_queues()
211
        return
212

    
213
    # Debug mode, process messages without spawning workers
214
    if opts.debug:
215
        debug_mode(logger = logger)
216
        return
217

    
218
    # Fork workers
219
    children = []
220

    
221
    i = 0
222
    while i < opts.workers:
223
        newpid = os.fork()
224

    
225
        if newpid == 0:
226
            signal(SIGINT, _exit_handler)
227
            signal(SIGTERM, _exit_handler)
228
            child(sys.argv[1:], logger)
229
            sys.exit(0)
230
        else:
231
            pids = (os.getpid(), newpid)
232
            logger.debug("%d, forked child: %d" % pids)
233
            children.append(pids[1])
234
        i += 1
235

    
236
    # Catch signals to ensure graceful shutdown
237
    signal(SIGINT,  _parent_handler)
238
    signal(SIGTERM, _parent_handler)
239

    
240
    # Wait for all children process to die, one by one
241
    for pid in children:
242
        try:
243
            os.waitpid(pid)
244
        except Exception as e:
245
            logger.error("Error waiting for child %d: %s"%(pid, e.message))
246
            raise
247

    
248

    
249
if __name__ == "__main__":
250
    logging.basicConfig(level=logging.DEBUG)
251
    sys.exit(main())
252

    
253
# vim: set sta sts=4 shiftwidth=4 sw=4 et ai :