Statistics
| Branch: | Tag: | Revision:

root / logic / dispatcher.py @ 2cd99e7a

History | View | Annotate | Download (7.4 kB)

1 d08a5f6f Vangelis Koukis
#!/usr/bin/env python
2 87ace70f Vassilios Karakoidas
#
3 c183005e Georgios Gousios
# Copyright (c) 2011 Greek Research and Technology Network
4 7bd50624 Vassilios Karakoidas
#
5 2cd99e7a Georgios Gousios
""" Message queue setup and dispatch
6 c183005e Georgios Gousios

7 2cd99e7a Georgios Gousios
This program sets up connections to the queues configured in settings.py
8 2cd99e7a Georgios Gousios
and implements the message wait and dispatch loops. Actual messages are
9 2cd99e7a Georgios Gousios
handled in the dispatched functions.
10 fcbc5bb3 Vassilios Karakoidas

11 d08a5f6f Vangelis Koukis
"""
12 87ace70f Vassilios Karakoidas
13 d08a5f6f Vangelis Koukis
from django.core.management import setup_environ
14 c99fe4c7 Vassilios Karakoidas
15 d08a5f6f Vangelis Koukis
import sys
16 86221fd5 Panos Louridas
import os
17 86221fd5 Panos Louridas
path = os.path.normpath(os.path.join(os.getcwd(), '..'))
18 86221fd5 Panos Louridas
sys.path.append(path)
19 a5c17ad3 Dimitris Moraitis
import synnefo.settings as settings
20 d08a5f6f Vangelis Koukis
21 d08a5f6f Vangelis Koukis
setup_environ(settings)
22 d08a5f6f Vangelis Koukis
23 da102335 Georgios Gousios
from amqplib import client_0_8 as amqp
24 8861126f Georgios Gousios
from signal import signal, SIGINT, SIGTERM
25 23c84263 Georgios Gousios
26 d08a5f6f Vangelis Koukis
import logging
27 8d8ea051 Georgios Gousios
import time
28 8d8ea051 Georgios Gousios
import socket
29 8d8ea051 Georgios Gousios
30 23c84263 Georgios Gousios
from synnefo.logic import dispatcher_callbacks
31 c99fe4c7 Vassilios Karakoidas
32 8861126f Georgios Gousios
33 78e2d194 Georgios Gousios
class Dispatcher:
34 8d8ea051 Georgios Gousios
35 78e2d194 Georgios Gousios
    logger = None
36 78e2d194 Georgios Gousios
    chan = None
37 5d081749 Georgios Gousios
    debug = False
38 5d081749 Georgios Gousios
    clienttags = []
39 78e2d194 Georgios Gousios
40 f30730c0 Georgios Gousios
    def __init__(self, debug = False, logger = None):
41 78e2d194 Georgios Gousios
        self.logger = logger
42 5d081749 Georgios Gousios
        self.debug = debug
43 5d081749 Georgios Gousios
        self._init()
44 da102335 Georgios Gousios
45 78e2d194 Georgios Gousios
    def wait(self):
46 78e2d194 Georgios Gousios
        while True:
47 78e2d194 Georgios Gousios
            try:
48 78e2d194 Georgios Gousios
                self.chan.wait()
49 78e2d194 Georgios Gousios
            except SystemExit:
50 78e2d194 Georgios Gousios
                break
51 5d081749 Georgios Gousios
            except socket.error:
52 5d081749 Georgios Gousios
                self.logger.error("Server went away, reconnecting...")
53 5d081749 Georgios Gousios
                self._init()
54 78e2d194 Georgios Gousios
55 5d081749 Georgios Gousios
        [self.chan.basic_cancel(clienttag) for clienttag in self.clienttags]
56 78e2d194 Georgios Gousios
        self.chan.close()
57 78e2d194 Georgios Gousios
        self.chan.connection.close()
58 8861126f Georgios Gousios
        sys.exit()
59 78e2d194 Georgios Gousios
60 5d081749 Georgios Gousios
    def _init(self):
61 c183005e Georgios Gousios
        # Connect to RabbitMQ
62 23c84263 Georgios Gousios
        conn = None
63 23c84263 Georgios Gousios
        while conn == None:
64 c183005e Georgios Gousios
            self.logger.info("Attempting to connect to %s",
65 c183005e Georgios Gousios
                             settings.RABBIT_HOST)
66 23c84263 Georgios Gousios
            try:
67 41f2249e Vangelis Koukis
                conn = amqp.Connection(host=settings.RABBIT_HOST,
68 41f2249e Vangelis Koukis
                                       userid=settings.RABBIT_USERNAME,
69 41f2249e Vangelis Koukis
                                       password=settings.RABBIT_PASSWORD,
70 41f2249e Vangelis Koukis
                                       virtual_host=settings.RABBIT_VHOST)
71 23c84263 Georgios Gousios
            except socket.error:
72 23c84263 Georgios Gousios
                time.sleep(1)
73 23c84263 Georgios Gousios
74 23c84263 Georgios Gousios
        self.logger.info("Connection succesful, opening channel")
75 23c84263 Georgios Gousios
        self.chan = conn.channel()
76 78e2d194 Georgios Gousios
77 c183005e Georgios Gousios
        # Declare queues and exchanges
78 f30730c0 Georgios Gousios
        for exchange in settings.EXCHANGES:
79 c183005e Georgios Gousios
            self.chan.exchange_declare(exchange=exchange, type="topic",
80 c183005e Georgios Gousios
                                       durable=True, auto_delete=False)
81 f30730c0 Georgios Gousios
82 f30730c0 Georgios Gousios
        for queue in settings.QUEUES:
83 c183005e Georgios Gousios
            self.chan.queue_declare(queue=queue, durable=True,
84 c183005e Georgios Gousios
                                    exclusive=False, auto_delete=False)
85 78e2d194 Georgios Gousios
86 23c84263 Georgios Gousios
        bindings = settings.BINDINGS
87 78e2d194 Georgios Gousios
88 c183005e Georgios Gousios
        # Special queue for debugging, should not appear in production
89 5d081749 Georgios Gousios
        if self.debug:
90 c183005e Georgios Gousios
            self.chan.queue_declare(queue=settings.QUEUE_DEBUG, durable=True,
91 c183005e Georgios Gousios
                                    exclusive=False, auto_delete=False)
92 23c84263 Georgios Gousios
            bindings += settings.BINDINGS_DEBUG
93 78e2d194 Georgios Gousios
94 c183005e Georgios Gousios
        # Bind queues to handler methods
95 5d081749 Georgios Gousios
        for binding in bindings:
96 78e2d194 Georgios Gousios
            try:
97 2cd99e7a Georgios Gousios
                callback = getattr(dispatcher_callbacks, binding[3])
98 23c84263 Georgios Gousios
            except AttributeError:
99 23c84263 Georgios Gousios
                self.logger.error("Cannot find callback %s" % binding[3])
100 c183005e Georgios Gousios
                continue
101 8d8ea051 Georgios Gousios
102 c183005e Georgios Gousios
            self.chan.queue_bind(queue=binding[0], exchange=binding[1],
103 c183005e Georgios Gousios
                                 routing_key=binding[2])
104 2cd99e7a Georgios Gousios
            tag = self.chan.basic_consume(queue=binding[0], callback=callback)
105 23c84263 Georgios Gousios
            self.logger.debug("Binding %s(%s) to queue %s with handler %s" %
106 23c84263 Georgios Gousios
                              (binding[1], binding[2], binding[0], binding[3]))
107 23c84263 Georgios Gousios
            self.clienttags.append(tag)
108 8d8ea051 Georgios Gousios
109 c183005e Georgios Gousios
110 c183005e Georgios Gousios
def _exit_handler(signum, frame):
111 c183005e Georgios Gousios
    """"Catch exit signal in children processes."""
112 2cd99e7a Georgios Gousios
    print "%d: Caught signal %d, will raise SystemExit" % (os.getpid(), signum)
113 8d8ea051 Georgios Gousios
    raise SystemExit
114 8d8ea051 Georgios Gousios
115 c183005e Georgios Gousios
116 c183005e Georgios Gousios
def _parent_handler(signum, frame):
117 c183005e Georgios Gousios
    """"Catch exit signal in parent process and forward it to children."""
118 8861126f Georgios Gousios
    global children
119 8861126f Georgios Gousios
    print "Caught signal %d, sending kill signal to children" % signum
120 8861126f Georgios Gousios
    [os.kill(pid, SIGTERM) for pid in children]
121 8861126f Georgios Gousios
122 c183005e Georgios Gousios
123 8861126f Georgios Gousios
def child(cmdline, logger):
124 c183005e Georgios Gousios
    """The context of the child process"""
125 c183005e Georgios Gousios
126 c183005e Georgios Gousios
    # Cmd line argument parsing
127 78e2d194 Georgios Gousios
    (opts, args) = parse_arguments(cmdline)
128 2cd99e7a Georgios Gousios
    disp = Dispatcher(debug = opts.debug, logger = logger)
129 78e2d194 Georgios Gousios
130 c183005e Georgios Gousios
    # Start the event loop
131 2cd99e7a Georgios Gousios
    disp.wait()
132 78e2d194 Georgios Gousios
133 c183005e Georgios Gousios
134 78e2d194 Georgios Gousios
def parse_arguments(args):
135 78e2d194 Georgios Gousios
    from optparse import OptionParser
136 78e2d194 Georgios Gousios
137 78e2d194 Georgios Gousios
    parser = OptionParser()
138 c183005e Georgios Gousios
    parser.add_option("-d", "--debug", action="store_true", default=False,
139 2cd99e7a Georgios Gousios
                      dest="debug", help="Enable debug mode")
140 78e2d194 Georgios Gousios
    parser.add_option("-l", "--log", dest="log_file",
141 2cd99e7a Georgios Gousios
                      default=settings.DISPATCHER_LOG_FILE, metavar="FILE",
142 2cd99e7a Georgios Gousios
                      help="Write log to FILE instead of %s" %
143 2cd99e7a Georgios Gousios
                           settings.DISPATCHER_LOG_FILE)
144 c183005e Georgios Gousios
    parser.add_option("-c", "--cleanup-queues", action="store_true",
145 c183005e Georgios Gousios
                      default=False, dest="cleanup_queues",
146 2cd99e7a Georgios Gousios
                      help="Remove all queues declared in settings.py (DANGEROUS!)")
147 c183005e Georgios Gousios
    parser.add_option("-w", "--workers", default=2, dest="workers",
148 2cd99e7a Georgios Gousios
                      help="Number of workers to spawn", type="int")
149 f30730c0 Georgios Gousios
    
150 78e2d194 Georgios Gousios
    return parser.parse_args(args)
151 78e2d194 Georgios Gousios
152 f30730c0 Georgios Gousios
153 c183005e Georgios Gousios
def cleanup_queues() :
154 c183005e Georgios Gousios
    """Delete declared queues from RabbitMQ. Use with care!"""
155 f30730c0 Georgios Gousios
    conn = amqp.Connection( host=settings.RABBIT_HOST,
156 f30730c0 Georgios Gousios
                            userid=settings.RABBIT_USERNAME,
157 f30730c0 Georgios Gousios
                            password=settings.RABBIT_PASSWORD,
158 f30730c0 Georgios Gousios
                            virtual_host=settings.RABBIT_VHOST)
159 f30730c0 Georgios Gousios
    chan = conn.channel()
160 f30730c0 Georgios Gousios
161 f30730c0 Georgios Gousios
    print "Queues to be deleted: ",  settings.QUEUES
162 f30730c0 Georgios Gousios
    print "Exchnages to be deleted: ", settings.EXCHANGES
163 f30730c0 Georgios Gousios
    ans = raw_input("Are you sure (N/y):")
164 f30730c0 Georgios Gousios
165 f30730c0 Georgios Gousios
    if not ans:
166 f30730c0 Georgios Gousios
        return
167 f30730c0 Georgios Gousios
    if ans not in ['Y', 'y']:
168 f30730c0 Georgios Gousios
        return
169 f30730c0 Georgios Gousios
170 f30730c0 Georgios Gousios
    for exchange in settings.EXCHANGES:
171 f30730c0 Georgios Gousios
        try:
172 f30730c0 Georgios Gousios
            chan.exchange_delete(exchange=exchange)
173 f30730c0 Georgios Gousios
        except amqp.exceptions.AMQPChannelException as e:
174 f30730c0 Georgios Gousios
            print e.amqp_reply_code, " ", e.amqp_reply_text
175 f30730c0 Georgios Gousios
176 f30730c0 Georgios Gousios
    for queue in settings.QUEUES:
177 f30730c0 Georgios Gousios
        try:
178 f30730c0 Georgios Gousios
            chan.queue_delete(queue=queue)
179 f30730c0 Georgios Gousios
        except amqp.exceptions.AMQPChannelException as e:
180 f30730c0 Georgios Gousios
            print e.amqp_reply_code, " ", e.amqp_reply_text
181 8861126f Georgios Gousios
    chan.close()
182 8861126f Georgios Gousios
    chan.connection.close()
183 f30730c0 Georgios Gousios
184 c183005e Georgios Gousios
185 78e2d194 Georgios Gousios
def main():
186 8861126f Georgios Gousios
    global children, logger
187 78e2d194 Georgios Gousios
    (opts, args) = parse_arguments(sys.argv[1:])
188 78e2d194 Georgios Gousios
189 8861126f Georgios Gousios
    # Initialize logger
190 8861126f Georgios Gousios
    lvl = logging.DEBUG if opts.debug else logging.INFO
191 8861126f Georgios Gousios
    logger = logging.getLogger("synnefo.dispatcher")
192 8861126f Georgios Gousios
    logger.setLevel(lvl)
193 c183005e Georgios Gousios
    formatter = logging.Formatter(
194 2cd99e7a Georgios Gousios
        "%(asctime)s %(module)s[%(process)d] %(levelname)s: %(message)s",
195 2cd99e7a Georgios Gousios
        "%Y-%m-%d %H:%M:%S")
196 8861126f Georgios Gousios
    handler = logging.FileHandler(opts.log_file)
197 8861126f Georgios Gousios
    handler.setFormatter(formatter)
198 8861126f Georgios Gousios
    logger.addHandler(handler)
199 8861126f Georgios Gousios
200 c183005e Georgios Gousios
    # Special case for the clean up queues action
201 f30730c0 Georgios Gousios
    if opts.cleanup_queues:
202 f30730c0 Georgios Gousios
        cleanup_queues()
203 f30730c0 Georgios Gousios
        return
204 f30730c0 Georgios Gousios
205 c183005e Georgios Gousios
    # Fork workers
206 8861126f Georgios Gousios
    children = []
207 8861126f Georgios Gousios
208 8861126f Georgios Gousios
    i = 0
209 8861126f Georgios Gousios
    while i < opts.workers:
210 8861126f Georgios Gousios
        newpid = os.fork()
211 8861126f Georgios Gousios
212 8861126f Georgios Gousios
        if newpid == 0:
213 c183005e Georgios Gousios
            signal(SIGINT, _exit_handler)
214 c183005e Georgios Gousios
            signal(SIGTERM, _exit_handler)
215 c183005e Georgios Gousios
            child(sys.argv[1:], logger)
216 8861126f Georgios Gousios
            sys.exit(0)
217 8861126f Georgios Gousios
        else:
218 8861126f Georgios Gousios
            pids = (os.getpid(), newpid)
219 8861126f Georgios Gousios
            logger.debug("%d, forked child: %d" % pids)
220 8861126f Georgios Gousios
            children.append(pids[1])
221 8861126f Georgios Gousios
        i += 1
222 8861126f Georgios Gousios
223 8d8ea051 Georgios Gousios
    # Catch signals to ensure graceful shutdown
224 c183005e Georgios Gousios
    signal(SIGINT,  _parent_handler)
225 c183005e Georgios Gousios
    signal(SIGTERM, _parent_handler)
226 8861126f Georgios Gousios
227 41f2249e Vangelis Koukis
    # Wait for all children process to die, one by one
228 2cd99e7a Georgios Gousios
    for pid in children:
229 41f2249e Vangelis Koukis
        try:
230 2cd99e7a Georgios Gousios
            os.waitpid(pid)
231 41f2249e Vangelis Koukis
        except Exception:
232 41f2249e Vangelis Koukis
            pass
233 5db87ed5 Vangelis Koukis
234 c183005e Georgios Gousios
235 d08a5f6f Vangelis Koukis
if __name__ == "__main__":
236 d08a5f6f Vangelis Koukis
    logging.basicConfig(level=logging.DEBUG)
237 d08a5f6f Vangelis Koukis
    sys.exit(main())
238 d08a5f6f Vangelis Koukis
239 8d8ea051 Georgios Gousios
# vim: set sta sts=4 shiftwidth=4 sw=4 et ai :