Statistics
| Branch: | Tag: | Revision:

root / logic / dispatcher.py @ 9cb903f9

History | View | Annotate | Download (7.9 kB)

1
#!/usr/bin/env python
2
#
3
# Copyright (c) 2011 Greek Research and Technology Network
4
#
5
""" Message queue setup and dispatch
6

7
This program sets up connections to the queues configured in settings.py
8
and implements the message wait and dispatch loops. Actual messages are
9
handled in the dispatched functions.
10

11
"""
12

    
13
from django.core.management import setup_environ
14

    
15
import sys
16
import os
17
path = os.path.normpath(os.path.join(os.getcwd(), '..'))
18
sys.path.append(path)
19
import synnefo.settings as settings
20

    
21
setup_environ(settings)
22

    
23
from amqplib import client_0_8 as amqp
24
from signal import signal, SIGINT, SIGTERM
25

    
26
import logging
27
import time
28
import socket
29

    
30
from synnefo.logic import callbacks
31

    
32

    
33
class Dispatcher:
34

    
35
    logger = None
36
    chan = None
37
    debug = False
38
    clienttags = []
39

    
40
    def __init__(self, debug = False, logger = None):
41
        self.logger = logger
42
        self.debug = debug
43
        self._init()
44

    
45
    def wait(self):
46
        while True:
47
            try:
48
                self.chan.wait()
49
            except SystemExit:
50
                break
51
            except amqp.exceptions.AMQPConnectionException:
52
                self.logger.error("Server went away, reconnecting...")
53
                self._init()
54
            except socket.error:
55
                self.logger.error("Server went away, reconnecting...")
56
                self._init()
57

    
58
        [self.chan.basic_cancel(clienttag) for clienttag in self.clienttags]
59
        self.chan.connection.close()
60
        self.chan.close()
61
        sys.exit()
62

    
63
    def _init(self):
64
        # Connect to RabbitMQ
65
        conn = None
66
        while conn == None:
67
            self.logger.info("Attempting to connect to %s",
68
                             settings.RABBIT_HOST)
69
            try:
70
                conn = amqp.Connection(host=settings.RABBIT_HOST,
71
                                       userid=settings.RABBIT_USERNAME,
72
                                       password=settings.RABBIT_PASSWORD,
73
                                       virtual_host=settings.RABBIT_VHOST)
74
            except socket.error:
75
                time.sleep(1)
76

    
77
        self.logger.info("Connection succesful, opening channel")
78
        self.chan = conn.channel()
79

    
80
        # Declare queues and exchanges
81
        for exchange in settings.EXCHANGES:
82
            self.chan.exchange_declare(exchange=exchange, type="topic",
83
                                       durable=True, auto_delete=False)
84

    
85
        for queue in settings.QUEUES:
86
            self.chan.queue_declare(queue=queue, durable=True,
87
                                    exclusive=False, auto_delete=False)
88

    
89
        bindings = settings.BINDINGS
90

    
91
        # Special queue for debugging, should not appear in production
92
        if self.debug:
93
            self.chan.queue_declare(queue=settings.QUEUE_DEBUG, durable=True,
94
                                    exclusive=False, auto_delete=False)
95
            bindings += settings.BINDINGS_DEBUG
96

    
97
        # Bind queues to handler methods
98
        for binding in bindings:
99
            try:
100
                callback = getattr(callbacks, binding[3])
101
            except AttributeError:
102
                self.logger.error("Cannot find callback %s" % binding[3])
103
                continue
104

    
105
            self.chan.queue_bind(queue=binding[0], exchange=binding[1],
106
                                 routing_key=binding[2])
107
            tag = self.chan.basic_consume(queue=binding[0], callback=callback)
108
            self.logger.debug("Binding %s(%s) to queue %s with handler %s" %
109
                              (binding[1], binding[2], binding[0], binding[3]))
110
            self.clienttags.append(tag)
111

    
112

    
113
def _exit_handler(signum, frame):
114
    """"Catch exit signal in children processes."""
115
    print "%d: Caught signal %d, will raise SystemExit" % (os.getpid(), signum)
116
    raise SystemExit
117

    
118

    
119
def _parent_handler(signum, frame):
120
    """"Catch exit signal in parent process and forward it to children."""
121
    global children
122
    print "Caught signal %d, sending kill signal to children" % signum
123
    [os.kill(pid, SIGTERM) for pid in children]
124

    
125

    
126
def child(cmdline, logger):
127
    """The context of the child process"""
128

    
129
    # Cmd line argument parsing
130
    (opts, args) = parse_arguments(cmdline)
131
    disp = Dispatcher(debug = opts.debug, logger = logger)
132

    
133
    # Start the event loop
134
    disp.wait()
135

    
136

    
137
def parse_arguments(args):
138
    from optparse import OptionParser
139

    
140
    parser = OptionParser()
141
    parser.add_option("-d", "--debug", action="store_true", default=False,
142
                      dest="debug", help="Enable debug mode")
143
    parser.add_option("-l", "--log", dest="log_file",
144
                      default=settings.DISPATCHER_LOG_FILE, metavar="FILE",
145
                      help="Write log to FILE instead of %s" %
146
                           settings.DISPATCHER_LOG_FILE)
147
    parser.add_option("-c", "--cleanup-queues", action="store_true",
148
                      default=False, dest="cleanup_queues",
149
                      help="Remove all queues declared in settings.py (DANGEROUS!)")
150
    parser.add_option("-w", "--workers", default=2, dest="workers",
151
                      help="Number of workers to spawn", type="int")
152
    
153
    return parser.parse_args(args)
154

    
155

    
156
def cleanup_queues() :
157
    """Delete declared queues from RabbitMQ. Use with care!"""
158
    conn = amqp.Connection( host=settings.RABBIT_HOST,
159
                            userid=settings.RABBIT_USERNAME,
160
                            password=settings.RABBIT_PASSWORD,
161
                            virtual_host=settings.RABBIT_VHOST)
162
    chan = conn.channel()
163

    
164
    print "Queues to be deleted: ",  settings.QUEUES
165
    print "Exchnages to be deleted: ", settings.EXCHANGES
166
    ans = raw_input("Are you sure (N/y):")
167

    
168
    if not ans:
169
        return
170
    if ans not in ['Y', 'y']:
171
        return
172

    
173
    #for exchange in settings.EXCHANGES:
174
    #    try:
175
    #        chan.exchange_delete(exchange=exchange)
176
    #    except amqp.exceptions.AMQPChannelException as e:
177
    #        print e.amqp_reply_code, " ", e.amqp_reply_text
178

    
179
    for queue in settings.QUEUES:
180
        try:
181
            chan.queue_delete(queue=queue)
182
        except amqp.exceptions.AMQPChannelException as e:
183
            print e.amqp_reply_code, " ", e.amqp_reply_text
184
    chan.close()
185
    chan.connection.close()
186

    
187

    
188
def debug_mode(logger):
189
    disp = Dispatcher(debug = True, logger = logger)
190
    signal(SIGINT, _exit_handler)
191
    signal(SIGTERM, _exit_handler)
192

    
193
    disp.wait()
194

    
195

    
196
def main():
197
    global children, logger
198
    (opts, args) = parse_arguments(sys.argv[1:])
199

    
200
    # Initialize logger
201
    lvl = logging.DEBUG if opts.debug else logging.INFO
202
    logger = logging.getLogger("synnefo.dispatcher")
203
    logger.setLevel(lvl)
204
    formatter = logging.Formatter(
205
        "%(asctime)s %(module)s[%(process)d] %(levelname)s: %(message)s",
206
        "%Y-%m-%d %H:%M:%S")
207
    handler = logging.FileHandler(opts.log_file)
208
    handler.setFormatter(formatter)
209
    logger.addHandler(handler)
210

    
211
    # Special case for the clean up queues action
212
    if opts.cleanup_queues:
213
        cleanup_queues()
214
        return
215

    
216
    # Debug mode, process messages without spawning workers
217
    if opts.debug:
218
        debug_mode(logger = logger)
219
        return
220

    
221
    # Fork workers
222
    children = []
223

    
224
    i = 0
225
    while i < opts.workers:
226
        newpid = os.fork()
227

    
228
        if newpid == 0:
229
            signal(SIGINT, _exit_handler)
230
            signal(SIGTERM, _exit_handler)
231
            child(sys.argv[1:], logger)
232
            sys.exit(0)
233
        else:
234
            pids = (os.getpid(), newpid)
235
            logger.debug("%d, forked child: %d" % pids)
236
            children.append(pids[1])
237
        i += 1
238

    
239
    # Catch signals to ensure graceful shutdown
240
    signal(SIGINT,  _parent_handler)
241
    signal(SIGTERM, _parent_handler)
242

    
243
    # Wait for all children process to die, one by one
244
    for pid in children:
245
        try:
246
            os.waitpid(pid, 0)
247
        except Exception:
248
            pass
249

    
250

    
251
if __name__ == "__main__":
252
    logging.basicConfig(level=logging.DEBUG)
253
    sys.exit(main())
254

    
255
# vim: set sta sts=4 shiftwidth=4 sw=4 et ai :