root / logic / dispatcher.py @ 2cd99e7a
History | View | Annotate | Download (7.4 kB)
1 | d08a5f6f | Vangelis Koukis | #!/usr/bin/env python
|
---|---|---|---|
2 | 87ace70f | Vassilios Karakoidas | #
|
3 | c183005e | Georgios Gousios | # Copyright (c) 2011 Greek Research and Technology Network
|
4 | 7bd50624 | Vassilios Karakoidas | #
|
5 | 2cd99e7a | Georgios Gousios | """ Message queue setup and dispatch
|
6 | c183005e | Georgios Gousios |
|
7 | 2cd99e7a | Georgios Gousios | This program sets up connections to the queues configured in settings.py
|
8 | 2cd99e7a | Georgios Gousios | and implements the message wait and dispatch loops. Actual messages are
|
9 | 2cd99e7a | Georgios Gousios | handled in the dispatched functions.
|
10 | fcbc5bb3 | Vassilios Karakoidas |
|
11 | d08a5f6f | Vangelis Koukis | """
|
12 | 87ace70f | Vassilios Karakoidas | |
13 | d08a5f6f | Vangelis Koukis | from django.core.management import setup_environ |
14 | c99fe4c7 | Vassilios Karakoidas | |
15 | d08a5f6f | Vangelis Koukis | import sys |
16 | 86221fd5 | Panos Louridas | import os |
17 | 86221fd5 | Panos Louridas | path = os.path.normpath(os.path.join(os.getcwd(), '..'))
|
18 | 86221fd5 | Panos Louridas | sys.path.append(path) |
19 | a5c17ad3 | Dimitris Moraitis | import synnefo.settings as settings |
20 | d08a5f6f | Vangelis Koukis | |
21 | d08a5f6f | Vangelis Koukis | setup_environ(settings) |
22 | d08a5f6f | Vangelis Koukis | |
23 | da102335 | Georgios Gousios | from amqplib import client_0_8 as amqp |
24 | 8861126f | Georgios Gousios | from signal import signal, SIGINT, SIGTERM |
25 | 23c84263 | Georgios Gousios | |
26 | d08a5f6f | Vangelis Koukis | import logging |
27 | 8d8ea051 | Georgios Gousios | import time |
28 | 8d8ea051 | Georgios Gousios | import socket |
29 | 8d8ea051 | Georgios Gousios | |
30 | 23c84263 | Georgios Gousios | from synnefo.logic import dispatcher_callbacks |
31 | c99fe4c7 | Vassilios Karakoidas | |
32 | 8861126f | Georgios Gousios | |
33 | 78e2d194 | Georgios Gousios | class Dispatcher: |
34 | 8d8ea051 | Georgios Gousios | |
35 | 78e2d194 | Georgios Gousios | logger = None
|
36 | 78e2d194 | Georgios Gousios | chan = None
|
37 | 5d081749 | Georgios Gousios | debug = False
|
38 | 5d081749 | Georgios Gousios | clienttags = [] |
39 | 78e2d194 | Georgios Gousios | |
40 | f30730c0 | Georgios Gousios | def __init__(self, debug = False, logger = None): |
41 | 78e2d194 | Georgios Gousios | self.logger = logger
|
42 | 5d081749 | Georgios Gousios | self.debug = debug
|
43 | 5d081749 | Georgios Gousios | self._init()
|
44 | da102335 | Georgios Gousios | |
45 | 78e2d194 | Georgios Gousios | def wait(self): |
46 | 78e2d194 | Georgios Gousios | while True: |
47 | 78e2d194 | Georgios Gousios | try:
|
48 | 78e2d194 | Georgios Gousios | self.chan.wait()
|
49 | 78e2d194 | Georgios Gousios | except SystemExit: |
50 | 78e2d194 | Georgios Gousios | break
|
51 | 5d081749 | Georgios Gousios | except socket.error:
|
52 | 5d081749 | Georgios Gousios | self.logger.error("Server went away, reconnecting...") |
53 | 5d081749 | Georgios Gousios | self._init()
|
54 | 78e2d194 | Georgios Gousios | |
55 | 5d081749 | Georgios Gousios | [self.chan.basic_cancel(clienttag) for clienttag in self.clienttags] |
56 | 78e2d194 | Georgios Gousios | self.chan.close()
|
57 | 78e2d194 | Georgios Gousios | self.chan.connection.close()
|
58 | 8861126f | Georgios Gousios | sys.exit() |
59 | 78e2d194 | Georgios Gousios | |
60 | 5d081749 | Georgios Gousios | def _init(self): |
61 | c183005e | Georgios Gousios | # Connect to RabbitMQ
|
62 | 23c84263 | Georgios Gousios | conn = None
|
63 | 23c84263 | Georgios Gousios | while conn == None: |
64 | c183005e | Georgios Gousios | self.logger.info("Attempting to connect to %s", |
65 | c183005e | Georgios Gousios | settings.RABBIT_HOST) |
66 | 23c84263 | Georgios Gousios | try:
|
67 | 41f2249e | Vangelis Koukis | conn = amqp.Connection(host=settings.RABBIT_HOST, |
68 | 41f2249e | Vangelis Koukis | userid=settings.RABBIT_USERNAME, |
69 | 41f2249e | Vangelis Koukis | password=settings.RABBIT_PASSWORD, |
70 | 41f2249e | Vangelis Koukis | virtual_host=settings.RABBIT_VHOST) |
71 | 23c84263 | Georgios Gousios | except socket.error:
|
72 | 23c84263 | Georgios Gousios | time.sleep(1)
|
73 | 23c84263 | Georgios Gousios | |
74 | 23c84263 | Georgios Gousios | self.logger.info("Connection succesful, opening channel") |
75 | 23c84263 | Georgios Gousios | self.chan = conn.channel()
|
76 | 78e2d194 | Georgios Gousios | |
77 | c183005e | Georgios Gousios | # Declare queues and exchanges
|
78 | f30730c0 | Georgios Gousios | for exchange in settings.EXCHANGES: |
79 | c183005e | Georgios Gousios | self.chan.exchange_declare(exchange=exchange, type="topic", |
80 | c183005e | Georgios Gousios | durable=True, auto_delete=False) |
81 | f30730c0 | Georgios Gousios | |
82 | f30730c0 | Georgios Gousios | for queue in settings.QUEUES: |
83 | c183005e | Georgios Gousios | self.chan.queue_declare(queue=queue, durable=True, |
84 | c183005e | Georgios Gousios | exclusive=False, auto_delete=False) |
85 | 78e2d194 | Georgios Gousios | |
86 | 23c84263 | Georgios Gousios | bindings = settings.BINDINGS |
87 | 78e2d194 | Georgios Gousios | |
88 | c183005e | Georgios Gousios | # Special queue for debugging, should not appear in production
|
89 | 5d081749 | Georgios Gousios | if self.debug: |
90 | c183005e | Georgios Gousios | self.chan.queue_declare(queue=settings.QUEUE_DEBUG, durable=True, |
91 | c183005e | Georgios Gousios | exclusive=False, auto_delete=False) |
92 | 23c84263 | Georgios Gousios | bindings += settings.BINDINGS_DEBUG |
93 | 78e2d194 | Georgios Gousios | |
94 | c183005e | Georgios Gousios | # Bind queues to handler methods
|
95 | 5d081749 | Georgios Gousios | for binding in bindings: |
96 | 78e2d194 | Georgios Gousios | try:
|
97 | 2cd99e7a | Georgios Gousios | callback = getattr(dispatcher_callbacks, binding[3]) |
98 | 23c84263 | Georgios Gousios | except AttributeError: |
99 | 23c84263 | Georgios Gousios | self.logger.error("Cannot find callback %s" % binding[3]) |
100 | c183005e | Georgios Gousios | continue
|
101 | 8d8ea051 | Georgios Gousios | |
102 | c183005e | Georgios Gousios | self.chan.queue_bind(queue=binding[0], exchange=binding[1], |
103 | c183005e | Georgios Gousios | routing_key=binding[2])
|
104 | 2cd99e7a | Georgios Gousios | tag = self.chan.basic_consume(queue=binding[0], callback=callback) |
105 | 23c84263 | Georgios Gousios | self.logger.debug("Binding %s(%s) to queue %s with handler %s" % |
106 | 23c84263 | Georgios Gousios | (binding[1], binding[2], binding[0], binding[3])) |
107 | 23c84263 | Georgios Gousios | self.clienttags.append(tag)
|
108 | 8d8ea051 | Georgios Gousios | |
109 | c183005e | Georgios Gousios | |
110 | c183005e | Georgios Gousios | def _exit_handler(signum, frame): |
111 | c183005e | Georgios Gousios | """"Catch exit signal in children processes."""
|
112 | 2cd99e7a | Georgios Gousios | print "%d: Caught signal %d, will raise SystemExit" % (os.getpid(), signum) |
113 | 8d8ea051 | Georgios Gousios | raise SystemExit |
114 | 8d8ea051 | Georgios Gousios | |
115 | c183005e | Georgios Gousios | |
116 | c183005e | Georgios Gousios | def _parent_handler(signum, frame): |
117 | c183005e | Georgios Gousios | """"Catch exit signal in parent process and forward it to children."""
|
118 | 8861126f | Georgios Gousios | global children
|
119 | 8861126f | Georgios Gousios | print "Caught signal %d, sending kill signal to children" % signum |
120 | 8861126f | Georgios Gousios | [os.kill(pid, SIGTERM) for pid in children] |
121 | 8861126f | Georgios Gousios | |
122 | c183005e | Georgios Gousios | |
123 | 8861126f | Georgios Gousios | def child(cmdline, logger): |
124 | c183005e | Georgios Gousios | """The context of the child process"""
|
125 | c183005e | Georgios Gousios | |
126 | c183005e | Georgios Gousios | # Cmd line argument parsing
|
127 | 78e2d194 | Georgios Gousios | (opts, args) = parse_arguments(cmdline) |
128 | 2cd99e7a | Georgios Gousios | disp = Dispatcher(debug = opts.debug, logger = logger) |
129 | 78e2d194 | Georgios Gousios | |
130 | c183005e | Georgios Gousios | # Start the event loop
|
131 | 2cd99e7a | Georgios Gousios | disp.wait() |
132 | 78e2d194 | Georgios Gousios | |
133 | c183005e | Georgios Gousios | |
134 | 78e2d194 | Georgios Gousios | def parse_arguments(args): |
135 | 78e2d194 | Georgios Gousios | from optparse import OptionParser |
136 | 78e2d194 | Georgios Gousios | |
137 | 78e2d194 | Georgios Gousios | parser = OptionParser() |
138 | c183005e | Georgios Gousios | parser.add_option("-d", "--debug", action="store_true", default=False, |
139 | 2cd99e7a | Georgios Gousios | dest="debug", help="Enable debug mode") |
140 | 78e2d194 | Georgios Gousios | parser.add_option("-l", "--log", dest="log_file", |
141 | 2cd99e7a | Georgios Gousios | default=settings.DISPATCHER_LOG_FILE, metavar="FILE",
|
142 | 2cd99e7a | Georgios Gousios | help="Write log to FILE instead of %s" %
|
143 | 2cd99e7a | Georgios Gousios | settings.DISPATCHER_LOG_FILE) |
144 | c183005e | Georgios Gousios | parser.add_option("-c", "--cleanup-queues", action="store_true", |
145 | c183005e | Georgios Gousios | default=False, dest="cleanup_queues", |
146 | 2cd99e7a | Georgios Gousios | help="Remove all queues declared in settings.py (DANGEROUS!)")
|
147 | c183005e | Georgios Gousios | parser.add_option("-w", "--workers", default=2, dest="workers", |
148 | 2cd99e7a | Georgios Gousios | help="Number of workers to spawn", type="int") |
149 | f30730c0 | Georgios Gousios | |
150 | 78e2d194 | Georgios Gousios | return parser.parse_args(args)
|
151 | 78e2d194 | Georgios Gousios | |
152 | f30730c0 | Georgios Gousios | |
153 | c183005e | Georgios Gousios | def cleanup_queues() : |
154 | c183005e | Georgios Gousios | """Delete declared queues from RabbitMQ. Use with care!"""
|
155 | f30730c0 | Georgios Gousios | conn = amqp.Connection( host=settings.RABBIT_HOST, |
156 | f30730c0 | Georgios Gousios | userid=settings.RABBIT_USERNAME, |
157 | f30730c0 | Georgios Gousios | password=settings.RABBIT_PASSWORD, |
158 | f30730c0 | Georgios Gousios | virtual_host=settings.RABBIT_VHOST) |
159 | f30730c0 | Georgios Gousios | chan = conn.channel() |
160 | f30730c0 | Georgios Gousios | |
161 | f30730c0 | Georgios Gousios | print "Queues to be deleted: ", settings.QUEUES |
162 | f30730c0 | Georgios Gousios | print "Exchnages to be deleted: ", settings.EXCHANGES |
163 | f30730c0 | Georgios Gousios | ans = raw_input("Are you sure (N/y):") |
164 | f30730c0 | Georgios Gousios | |
165 | f30730c0 | Georgios Gousios | if not ans: |
166 | f30730c0 | Georgios Gousios | return
|
167 | f30730c0 | Georgios Gousios | if ans not in ['Y', 'y']: |
168 | f30730c0 | Georgios Gousios | return
|
169 | f30730c0 | Georgios Gousios | |
170 | f30730c0 | Georgios Gousios | for exchange in settings.EXCHANGES: |
171 | f30730c0 | Georgios Gousios | try:
|
172 | f30730c0 | Georgios Gousios | chan.exchange_delete(exchange=exchange) |
173 | f30730c0 | Georgios Gousios | except amqp.exceptions.AMQPChannelException as e: |
174 | f30730c0 | Georgios Gousios | print e.amqp_reply_code, " ", e.amqp_reply_text |
175 | f30730c0 | Georgios Gousios | |
176 | f30730c0 | Georgios Gousios | for queue in settings.QUEUES: |
177 | f30730c0 | Georgios Gousios | try:
|
178 | f30730c0 | Georgios Gousios | chan.queue_delete(queue=queue) |
179 | f30730c0 | Georgios Gousios | except amqp.exceptions.AMQPChannelException as e: |
180 | f30730c0 | Georgios Gousios | print e.amqp_reply_code, " ", e.amqp_reply_text |
181 | 8861126f | Georgios Gousios | chan.close() |
182 | 8861126f | Georgios Gousios | chan.connection.close() |
183 | f30730c0 | Georgios Gousios | |
184 | c183005e | Georgios Gousios | |
185 | 78e2d194 | Georgios Gousios | def main(): |
186 | 8861126f | Georgios Gousios | global children, logger
|
187 | 78e2d194 | Georgios Gousios | (opts, args) = parse_arguments(sys.argv[1:])
|
188 | 78e2d194 | Georgios Gousios | |
189 | 8861126f | Georgios Gousios | # Initialize logger
|
190 | 8861126f | Georgios Gousios | lvl = logging.DEBUG if opts.debug else logging.INFO |
191 | 8861126f | Georgios Gousios | logger = logging.getLogger("synnefo.dispatcher")
|
192 | 8861126f | Georgios Gousios | logger.setLevel(lvl) |
193 | c183005e | Georgios Gousios | formatter = logging.Formatter( |
194 | 2cd99e7a | Georgios Gousios | "%(asctime)s %(module)s[%(process)d] %(levelname)s: %(message)s",
|
195 | 2cd99e7a | Georgios Gousios | "%Y-%m-%d %H:%M:%S")
|
196 | 8861126f | Georgios Gousios | handler = logging.FileHandler(opts.log_file) |
197 | 8861126f | Georgios Gousios | handler.setFormatter(formatter) |
198 | 8861126f | Georgios Gousios | logger.addHandler(handler) |
199 | 8861126f | Georgios Gousios | |
200 | c183005e | Georgios Gousios | # Special case for the clean up queues action
|
201 | f30730c0 | Georgios Gousios | if opts.cleanup_queues:
|
202 | f30730c0 | Georgios Gousios | cleanup_queues() |
203 | f30730c0 | Georgios Gousios | return
|
204 | f30730c0 | Georgios Gousios | |
205 | c183005e | Georgios Gousios | # Fork workers
|
206 | 8861126f | Georgios Gousios | children = [] |
207 | 8861126f | Georgios Gousios | |
208 | 8861126f | Georgios Gousios | i = 0
|
209 | 8861126f | Georgios Gousios | while i < opts.workers:
|
210 | 8861126f | Georgios Gousios | newpid = os.fork() |
211 | 8861126f | Georgios Gousios | |
212 | 8861126f | Georgios Gousios | if newpid == 0: |
213 | c183005e | Georgios Gousios | signal(SIGINT, _exit_handler) |
214 | c183005e | Georgios Gousios | signal(SIGTERM, _exit_handler) |
215 | c183005e | Georgios Gousios | child(sys.argv[1:], logger)
|
216 | 8861126f | Georgios Gousios | sys.exit(0)
|
217 | 8861126f | Georgios Gousios | else:
|
218 | 8861126f | Georgios Gousios | pids = (os.getpid(), newpid) |
219 | 8861126f | Georgios Gousios | logger.debug("%d, forked child: %d" % pids)
|
220 | 8861126f | Georgios Gousios | children.append(pids[1])
|
221 | 8861126f | Georgios Gousios | i += 1
|
222 | 8861126f | Georgios Gousios | |
223 | 8d8ea051 | Georgios Gousios | # Catch signals to ensure graceful shutdown
|
224 | c183005e | Georgios Gousios | signal(SIGINT, _parent_handler) |
225 | c183005e | Georgios Gousios | signal(SIGTERM, _parent_handler) |
226 | 8861126f | Georgios Gousios | |
227 | 41f2249e | Vangelis Koukis | # Wait for all children process to die, one by one
|
228 | 2cd99e7a | Georgios Gousios | for pid in children: |
229 | 41f2249e | Vangelis Koukis | try:
|
230 | 2cd99e7a | Georgios Gousios | os.waitpid(pid) |
231 | 41f2249e | Vangelis Koukis | except Exception: |
232 | 41f2249e | Vangelis Koukis | pass
|
233 | 5db87ed5 | Vangelis Koukis | |
234 | c183005e | Georgios Gousios | |
235 | d08a5f6f | Vangelis Koukis | if __name__ == "__main__": |
236 | d08a5f6f | Vangelis Koukis | logging.basicConfig(level=logging.DEBUG) |
237 | d08a5f6f | Vangelis Koukis | sys.exit(main()) |
238 | d08a5f6f | Vangelis Koukis | |
239 | 8d8ea051 | Georgios Gousios | # vim: set sta sts=4 shiftwidth=4 sw=4 et ai : |