Statistics
| Branch: | Tag: | Revision:

root / logic / dispatcher.py @ cadaffb1

History | View | Annotate | Download (7.9 kB)

1
#!/usr/bin/env python
2
#
3
# Copyright (c) 2011 Greek Research and Technology Network
4
#
5
""" Message queue setup and dispatch
6

7
This program sets up connections to the queues configured in settings.py
8
and implements the message wait and dispatch loops. Actual messages are
9
handled in the dispatched functions.
10

11
"""
12

    
13
from django.core.management import setup_environ
14

    
15
import sys
16
import os
17
path = os.path.normpath(os.path.join(os.getcwd(), '..'))
18
sys.path.append(path)
19
import synnefo.settings as settings
20

    
21
setup_environ(settings)
22

    
23
from amqplib import client_0_8 as amqp
24
from signal import signal, SIGINT, SIGTERM
25

    
26
import logging
27
import time
28
import socket
29

    
30
from synnefo.logic import dispatcher_callbacks
31

    
32

    
33
class Dispatcher:
34

    
35
    logger = None
36
    chan = None
37
    debug = False
38
    clienttags = []
39

    
40
    def __init__(self, debug = False, logger = None):
41
        self.logger = logger
42
        self.debug = debug
43
        self._init()
44

    
45
    def wait(self):
46
        while True:
47
            try:
48
                self.chan.wait()
49
            except SystemExit:
50
                break
51
            except amqp.exceptions.AMQPConnectionException:
52
                self.logger.error("Server went away, reconnecting...")
53
                self._init()
54
            except socket.error:
55
                self.logger.error("Server went away, reconnecting...")
56
                self._init()
57

    
58
        [self.chan.basic_cancel(clienttag) for clienttag in self.clienttags]
59
        self.chan.connection.close()
60
        self.chan.close()
61
        sys.exit()
62

    
63
    def _init(self):
64
        # Connect to RabbitMQ
65
        conn = None
66
        while conn == None:
67
            self.logger.info("Attempting to connect to %s",
68
                             settings.RABBIT_HOST)
69
            try:
70
                conn = amqp.Connection(host=settings.RABBIT_HOST,
71
                                       userid=settings.RABBIT_USERNAME,
72
                                       password=settings.RABBIT_PASSWORD,
73
                                       virtual_host=settings.RABBIT_VHOST)
74
            except socket.error:
75
                time.sleep(1)
76

    
77
        self.logger.info("Connection succesful, opening channel")
78
        self.chan = conn.channel()
79

    
80
        # Declare queues and exchanges
81
        for exchange in settings.EXCHANGES:
82
            self.chan.exchange_declare(exchange=exchange, type="topic",
83
                                       durable=True, auto_delete=False)
84

    
85
        for queue in settings.QUEUES:
86
            self.chan.queue_declare(queue=queue, durable=True,
87
                                    exclusive=False, auto_delete=False)
88

    
89
        bindings = settings.BINDINGS
90

    
91
        # Special queue for debugging, should not appear in production
92
        if self.debug:
93
            self.chan.queue_declare(queue=settings.QUEUE_DEBUG, durable=True,
94
                                    exclusive=False, auto_delete=False)
95
            bindings += settings.BINDINGS_DEBUG
96

    
97
        # Bind queues to handler methods
98
        for binding in bindings:
99
            try:
100
                callback = getattr(dispatcher_callbacks, binding[3])
101
            except AttributeError:
102
                self.logger.error("Cannot find callback %s" % binding[3])
103
                continue
104

    
105
            self.chan.queue_bind(queue=binding[0], exchange=binding[1],
106
                                 routing_key=binding[2])
107
            tag = self.chan.basic_consume(queue=binding[0], callback=callback)
108
            self.logger.debug("Binding %s(%s) to queue %s with handler %s" %
109
                              (binding[1], binding[2], binding[0], binding[3]))
110
            self.clienttags.append(tag)
111

    
112

    
113
def _exit_handler(signum, frame):
114
    """"Catch exit signal in children processes."""
115
    print "%d: Caught signal %d, will raise SystemExit" % (os.getpid(), signum)
116
    raise SystemExit
117

    
118

    
119
def _parent_handler(signum, frame):
120
    """"Catch exit signal in parent process and forward it to children."""
121
    global children
122
    print "Caught signal %d, sending kill signal to children" % signum
123
    [os.kill(pid, SIGTERM) for pid in children]
124

    
125

    
126
def child(cmdline, logger):
127
    """The context of the child process"""
128

    
129
    # Cmd line argument parsing
130
    (opts, args) = parse_arguments(cmdline)
131
    disp = Dispatcher(debug = opts.debug, logger = logger)
132

    
133
    # Start the event loop
134
    disp.wait()
135

    
136

    
137
def parse_arguments(args):
138
    from optparse import OptionParser
139

    
140
    parser = OptionParser()
141
    parser.add_option("-d", "--debug", action="store_true", default=False,
142
                      dest="debug", help="Enable debug mode")
143
    parser.add_option("-l", "--log", dest="log_file",
144
                      default=settings.DISPATCHER_LOG_FILE, metavar="FILE",
145
                      help="Write log to FILE instead of %s" %
146
                           settings.DISPATCHER_LOG_FILE)
147
    parser.add_option("-c", "--cleanup-queues", action="store_true",
148
                      default=False, dest="cleanup_queues",
149
                      help="Remove all queues declared in settings.py (DANGEROUS!)")
150
    parser.add_option("-w", "--workers", default=2, dest="workers",
151
                      help="Number of workers to spawn", type="int")
152
    
153
    return parser.parse_args(args)
154

    
155

    
156
def cleanup_queues() :
157
    """Delete declared queues from RabbitMQ. Use with care!"""
158
    conn = amqp.Connection( host=settings.RABBIT_HOST,
159
                            userid=settings.RABBIT_USERNAME,
160
                            password=settings.RABBIT_PASSWORD,
161
                            virtual_host=settings.RABBIT_VHOST)
162
    chan = conn.channel()
163

    
164
    print "Queues to be deleted: ",  settings.QUEUES
165
    print "Exchnages to be deleted: ", settings.EXCHANGES
166
    ans = raw_input("Are you sure (N/y):")
167

    
168
    if not ans:
169
        return
170
    if ans not in ['Y', 'y']:
171
        return
172

    
173
    for exchange in settings.EXCHANGES:
174
        try:
175
            chan.exchange_delete(exchange=exchange)
176
        except amqp.exceptions.AMQPChannelException as e:
177
            print e.amqp_reply_code, " ", e.amqp_reply_text
178

    
179
    for queue in settings.QUEUES:
180
        try:
181
            chan.queue_delete(queue=queue)
182
        except amqp.exceptions.AMQPChannelException as e:
183
            print e.amqp_reply_code, " ", e.amqp_reply_text
184
    chan.close()
185
    chan.connection.close()
186

    
187

    
188
def debug_mode(logger):
189
    disp = Dispatcher(debug = True, logger = logger)
190
    signal(SIGINT, _exit_handler)
191
    signal(SIGTERM, _exit_handler)
192

    
193
    disp.wait()
194

    
195

    
196
def main():
197
    global children, logger
198
    (opts, args) = parse_arguments(sys.argv[1:])
199

    
200
    # Initialize logger
201
    lvl = logging.DEBUG if opts.debug else logging.INFO
202
    logger = logging.getLogger("synnefo.dispatcher")
203
    logger.setLevel(lvl)
204
    formatter = logging.Formatter(
205
        "%(asctime)s %(module)s[%(process)d] %(levelname)s: %(message)s",
206
        "%Y-%m-%d %H:%M:%S")
207
    handler = logging.FileHandler(opts.log_file)
208
    handler.setFormatter(formatter)
209
    logger.addHandler(handler)
210

    
211
    # Special case for the clean up queues action
212
    if opts.cleanup_queues:
213
        cleanup_queues()
214
        return
215

    
216
    # Debug mode, process messages without spawning workers
217
    if opts.debug:
218
        debug_mode(logger = logger)
219
        return
220

    
221
    # Fork workers
222
    children = []
223

    
224
    i = 0
225
    while i < opts.workers:
226
        newpid = os.fork()
227

    
228
        if newpid == 0:
229
            signal(SIGINT, _exit_handler)
230
            signal(SIGTERM, _exit_handler)
231
            child(sys.argv[1:], logger)
232
            sys.exit(0)
233
        else:
234
            pids = (os.getpid(), newpid)
235
            logger.debug("%d, forked child: %d" % pids)
236
            children.append(pids[1])
237
        i += 1
238

    
239
    # Catch signals to ensure graceful shutdown
240
    signal(SIGINT,  _parent_handler)
241
    signal(SIGTERM, _parent_handler)
242

    
243
    # Wait for all children process to die, one by one
244
    for pid in children:
245
        try:
246
            os.waitpid(pid)
247
        except Exception as e:
248
            logger.error("Error waiting for child %d: %s"%(pid, e.message))
249
            raise
250

    
251

    
252
if __name__ == "__main__":
253
    logging.basicConfig(level=logging.DEBUG)
254
    sys.exit(main())
255

    
256
# vim: set sta sts=4 shiftwidth=4 sw=4 et ai :