Statistics
| Branch: | Tag: | Revision:

root / logic / dispatcher.py @ a4d2780c

History | View | Annotate | Download (11.8 kB)

1 d08a5f6f Vangelis Koukis
#!/usr/bin/env python
2 48130e66 Georgios Gousios
# Copyright 2011 GRNET S.A. All rights reserved.
3 87ace70f Vassilios Karakoidas
#
4 48130e66 Georgios Gousios
# Redistribution and use in source and binary forms, with or without
5 48130e66 Georgios Gousios
# modification, are permitted provided that the following conditions
6 48130e66 Georgios Gousios
# are met:
7 7bd50624 Vassilios Karakoidas
#
8 48130e66 Georgios Gousios
#   1. Redistributions of source code must retain the above copyright
9 48130e66 Georgios Gousios
#      notice, this list of conditions and the following disclaimer.
10 48130e66 Georgios Gousios
#
11 48130e66 Georgios Gousios
#  2. Redistributions in binary form must reproduce the above copyright
12 48130e66 Georgios Gousios
#     notice, this list of conditions and the following disclaimer in the
13 48130e66 Georgios Gousios
#     documentation and/or other materials provided with the distribution.
14 48130e66 Georgios Gousios
#
15 48130e66 Georgios Gousios
# THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
16 48130e66 Georgios Gousios
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17 48130e66 Georgios Gousios
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18 48130e66 Georgios Gousios
# ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
19 48130e66 Georgios Gousios
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
20 48130e66 Georgios Gousios
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
21 48130e66 Georgios Gousios
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
22 48130e66 Georgios Gousios
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
23 48130e66 Georgios Gousios
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
24 48130e66 Georgios Gousios
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
25 48130e66 Georgios Gousios
# SUCH DAMAGE.
26 48130e66 Georgios Gousios
#
27 48130e66 Georgios Gousios
# The views and conclusions contained in the software and documentation are
28 48130e66 Georgios Gousios
# those of the authors and should not be interpreted as representing official
29 48130e66 Georgios Gousios
# policies, either expressed or implied, of GRNET S.A.
30 48130e66 Georgios Gousios
31 48130e66 Georgios Gousios
32 e6209aa2 Georgios Gousios
""" Message queue setup, dispatch and admin
33 c183005e Georgios Gousios

34 2cd99e7a Georgios Gousios
This program sets up connections to the queues configured in settings.py
35 2cd99e7a Georgios Gousios
and implements the message wait and dispatch loops. Actual messages are
36 2cd99e7a Georgios Gousios
handled in the dispatched functions.
37 fcbc5bb3 Vassilios Karakoidas

38 d08a5f6f Vangelis Koukis
"""
39 d08a5f6f Vangelis Koukis
from django.core.management import setup_environ
40 c99fe4c7 Vassilios Karakoidas
41 d08a5f6f Vangelis Koukis
import sys
42 86221fd5 Panos Louridas
import os
43 86221fd5 Panos Louridas
path = os.path.normpath(os.path.join(os.getcwd(), '..'))
44 86221fd5 Panos Louridas
sys.path.append(path)
45 a5c17ad3 Dimitris Moraitis
import synnefo.settings as settings
46 f13691b3 Georgios Gousios
from synnefo.logic import log
47 d08a5f6f Vangelis Koukis
48 d08a5f6f Vangelis Koukis
setup_environ(settings)
49 d08a5f6f Vangelis Koukis
50 da102335 Georgios Gousios
from amqplib import client_0_8 as amqp
51 8861126f Georgios Gousios
from signal import signal, SIGINT, SIGTERM
52 23c84263 Georgios Gousios
53 8d8ea051 Georgios Gousios
import time
54 8d8ea051 Georgios Gousios
import socket
55 4ed2e471 Georgios Gousios
from daemon import daemon
56 4ed2e471 Georgios Gousios
57 4ed2e471 Georgios Gousios
# Take care of differences between python-daemon versions.
58 4ed2e471 Georgios Gousios
try:
59 4ed2e471 Georgios Gousios
    from daemon import pidfile
60 4ed2e471 Georgios Gousios
except:
61 4ed2e471 Georgios Gousios
    from daemon import pidlockfile
62 8d8ea051 Georgios Gousios
63 9cb903f9 Vangelis Koukis
from synnefo.logic import callbacks
64 c99fe4c7 Vassilios Karakoidas
65 78e2d194 Georgios Gousios
class Dispatcher:
66 8d8ea051 Georgios Gousios
67 78e2d194 Georgios Gousios
    logger = None
68 78e2d194 Georgios Gousios
    chan = None
69 5d081749 Georgios Gousios
    debug = False
70 5d081749 Georgios Gousios
    clienttags = []
71 78e2d194 Georgios Gousios
72 57d0082a Georgios Gousios
    def __init__(self, debug = False):
73 f13691b3 Georgios Gousios
        
74 57d0082a Georgios Gousios
        # Initialize logger
75 f13691b3 Georgios Gousios
        self.logger = log.get_logger('synnefo.dispatcher')
76 57d0082a Georgios Gousios
77 5d081749 Georgios Gousios
        self.debug = debug
78 5d081749 Georgios Gousios
        self._init()
79 da102335 Georgios Gousios
80 78e2d194 Georgios Gousios
    def wait(self):
81 78e2d194 Georgios Gousios
        while True:
82 78e2d194 Georgios Gousios
            try:
83 78e2d194 Georgios Gousios
                self.chan.wait()
84 78e2d194 Georgios Gousios
            except SystemExit:
85 78e2d194 Georgios Gousios
                break
86 cadaffb1 Georgios Gousios
            except amqp.exceptions.AMQPConnectionException:
87 cadaffb1 Georgios Gousios
                self.logger.error("Server went away, reconnecting...")
88 cadaffb1 Georgios Gousios
                self._init()
89 5d081749 Georgios Gousios
            except socket.error:
90 5d081749 Georgios Gousios
                self.logger.error("Server went away, reconnecting...")
91 5d081749 Georgios Gousios
                self._init()
92 78e2d194 Georgios Gousios
93 5d081749 Georgios Gousios
        [self.chan.basic_cancel(clienttag) for clienttag in self.clienttags]
94 78e2d194 Georgios Gousios
        self.chan.connection.close()
95 838239fa Georgios Gousios
        self.chan.close()
96 78e2d194 Georgios Gousios
97 5d081749 Georgios Gousios
    def _init(self):
98 57d0082a Georgios Gousios
        self.logger.info("Initializing")
99 57d0082a Georgios Gousios
        
100 c183005e Georgios Gousios
        # Connect to RabbitMQ
101 23c84263 Georgios Gousios
        conn = None
102 23c84263 Georgios Gousios
        while conn == None:
103 c183005e Georgios Gousios
            self.logger.info("Attempting to connect to %s",
104 c183005e Georgios Gousios
                             settings.RABBIT_HOST)
105 23c84263 Georgios Gousios
            try:
106 41f2249e Vangelis Koukis
                conn = amqp.Connection(host=settings.RABBIT_HOST,
107 41f2249e Vangelis Koukis
                                       userid=settings.RABBIT_USERNAME,
108 41f2249e Vangelis Koukis
                                       password=settings.RABBIT_PASSWORD,
109 41f2249e Vangelis Koukis
                                       virtual_host=settings.RABBIT_VHOST)
110 23c84263 Georgios Gousios
            except socket.error:
111 23c84263 Georgios Gousios
                time.sleep(1)
112 23c84263 Georgios Gousios
113 23c84263 Georgios Gousios
        self.logger.info("Connection succesful, opening channel")
114 23c84263 Georgios Gousios
        self.chan = conn.channel()
115 78e2d194 Georgios Gousios
116 c183005e Georgios Gousios
        # Declare queues and exchanges
117 f30730c0 Georgios Gousios
        for exchange in settings.EXCHANGES:
118 c183005e Georgios Gousios
            self.chan.exchange_declare(exchange=exchange, type="topic",
119 c183005e Georgios Gousios
                                       durable=True, auto_delete=False)
120 f30730c0 Georgios Gousios
121 f30730c0 Georgios Gousios
        for queue in settings.QUEUES:
122 c183005e Georgios Gousios
            self.chan.queue_declare(queue=queue, durable=True,
123 c183005e Georgios Gousios
                                    exclusive=False, auto_delete=False)
124 78e2d194 Georgios Gousios
125 23c84263 Georgios Gousios
        bindings = settings.BINDINGS
126 78e2d194 Georgios Gousios
127 c183005e Georgios Gousios
        # Special queue for debugging, should not appear in production
128 daeb288f Georgios Gousios
        if self.debug and settings.DEBUG:
129 c183005e Georgios Gousios
            self.chan.queue_declare(queue=settings.QUEUE_DEBUG, durable=True,
130 c183005e Georgios Gousios
                                    exclusive=False, auto_delete=False)
131 23c84263 Georgios Gousios
            bindings += settings.BINDINGS_DEBUG
132 78e2d194 Georgios Gousios
133 c183005e Georgios Gousios
        # Bind queues to handler methods
134 5d081749 Georgios Gousios
        for binding in bindings:
135 78e2d194 Georgios Gousios
            try:
136 9cb903f9 Vangelis Koukis
                callback = getattr(callbacks, binding[3])
137 23c84263 Georgios Gousios
            except AttributeError:
138 23c84263 Georgios Gousios
                self.logger.error("Cannot find callback %s" % binding[3])
139 c183005e Georgios Gousios
                continue
140 8d8ea051 Georgios Gousios
141 c183005e Georgios Gousios
            self.chan.queue_bind(queue=binding[0], exchange=binding[1],
142 c183005e Georgios Gousios
                                 routing_key=binding[2])
143 2cd99e7a Georgios Gousios
            tag = self.chan.basic_consume(queue=binding[0], callback=callback)
144 23c84263 Georgios Gousios
            self.logger.debug("Binding %s(%s) to queue %s with handler %s" %
145 23c84263 Georgios Gousios
                              (binding[1], binding[2], binding[0], binding[3]))
146 23c84263 Georgios Gousios
            self.clienttags.append(tag)
147 8d8ea051 Georgios Gousios
148 c183005e Georgios Gousios
149 c183005e Georgios Gousios
def _exit_handler(signum, frame):
150 c183005e Georgios Gousios
    """"Catch exit signal in children processes."""
151 2cd99e7a Georgios Gousios
    print "%d: Caught signal %d, will raise SystemExit" % (os.getpid(), signum)
152 8d8ea051 Georgios Gousios
    raise SystemExit
153 8d8ea051 Georgios Gousios
154 c183005e Georgios Gousios
155 c183005e Georgios Gousios
def _parent_handler(signum, frame):
156 c183005e Georgios Gousios
    """"Catch exit signal in parent process and forward it to children."""
157 8861126f Georgios Gousios
    global children
158 8861126f Georgios Gousios
    print "Caught signal %d, sending kill signal to children" % signum
159 8861126f Georgios Gousios
    [os.kill(pid, SIGTERM) for pid in children]
160 8861126f Georgios Gousios
161 c183005e Georgios Gousios
162 57d0082a Georgios Gousios
def child(cmdline):
163 c183005e Georgios Gousios
    """The context of the child process"""
164 c183005e Georgios Gousios
165 c183005e Georgios Gousios
    # Cmd line argument parsing
166 78e2d194 Georgios Gousios
    (opts, args) = parse_arguments(cmdline)
167 57d0082a Georgios Gousios
    disp = Dispatcher(debug = opts.debug)
168 78e2d194 Georgios Gousios
169 c183005e Georgios Gousios
    # Start the event loop
170 2cd99e7a Georgios Gousios
    disp.wait()
171 78e2d194 Georgios Gousios
172 c183005e Georgios Gousios
173 78e2d194 Georgios Gousios
def parse_arguments(args):
174 78e2d194 Georgios Gousios
    from optparse import OptionParser
175 78e2d194 Georgios Gousios
176 78e2d194 Georgios Gousios
    parser = OptionParser()
177 c183005e Georgios Gousios
    parser.add_option("-d", "--debug", action="store_true", default=False,
178 2cd99e7a Georgios Gousios
                      dest="debug", help="Enable debug mode")
179 e6209aa2 Georgios Gousios
    parser.add_option("-w", "--workers", default=2, dest="workers",
180 e6209aa2 Georgios Gousios
                      help="Number of workers to spawn", type="int")
181 e6209aa2 Georgios Gousios
    parser.add_option("-p", '--pid-file', dest="pid_file",
182 e6209aa2 Georgios Gousios
                      default=os.path.join(os.getcwd(), "dispatcher.pid"),
183 e6209aa2 Georgios Gousios
                      help="Save PID to file (default:%s)" %
184 e6209aa2 Georgios Gousios
                           os.path.join(os.getcwd(), "dispatcher.pid"))
185 979482ce Georgios Gousios
    parser.add_option("--purge-queues", action="store_true",
186 979482ce Georgios Gousios
                      default=False, dest="purge_queues",
187 57d0082a Georgios Gousios
                      help="Remove all declared queues (DANGEROUS!)")
188 979482ce Georgios Gousios
    parser.add_option("--purge-exchanges", action="store_true",
189 979482ce Georgios Gousios
                      default=False, dest="purge_exchanges",
190 979482ce Georgios Gousios
                      help="Remove all exchanges. Implies deleting all queues \
191 979482ce Georgios Gousios
                           first (DANGEROUS!)")
192 d5470cdd Georgios Gousios
    parser.add_option("--drain-queue", dest="drain_queue",
193 e6209aa2 Georgios Gousios
                      help="Strips a queue from all outstanding messages")
194 de081774 Georgios Gousios
195 78e2d194 Georgios Gousios
    return parser.parse_args(args)
196 78e2d194 Georgios Gousios
197 f30730c0 Georgios Gousios
198 979482ce Georgios Gousios
def purge_queues() :
199 979482ce Georgios Gousios
    """
200 979482ce Georgios Gousios
        Delete declared queues from RabbitMQ. Use with care!
201 979482ce Georgios Gousios
    """
202 979482ce Georgios Gousios
    conn = get_connection()
203 f30730c0 Georgios Gousios
    chan = conn.channel()
204 f30730c0 Georgios Gousios
205 f30730c0 Georgios Gousios
    print "Queues to be deleted: ",  settings.QUEUES
206 f30730c0 Georgios Gousios
207 979482ce Georgios Gousios
    if not get_user_confirmation():
208 f30730c0 Georgios Gousios
        return
209 f30730c0 Georgios Gousios
210 f30730c0 Georgios Gousios
    for queue in settings.QUEUES:
211 f30730c0 Georgios Gousios
        try:
212 f30730c0 Georgios Gousios
            chan.queue_delete(queue=queue)
213 979482ce Georgios Gousios
            print "Deleting queue %s" % queue
214 979482ce Georgios Gousios
        except amqp.exceptions.AMQPChannelException as e:
215 979482ce Georgios Gousios
            print e.amqp_reply_code, " ", e.amqp_reply_text
216 979482ce Georgios Gousios
            chan = conn.channel()
217 979482ce Georgios Gousios
218 979482ce Georgios Gousios
    chan.connection.close()
219 979482ce Georgios Gousios
220 979482ce Georgios Gousios
221 979482ce Georgios Gousios
def purge_exchanges():
222 979482ce Georgios Gousios
    """
223 979482ce Georgios Gousios
        Delete declared exchanges from RabbitMQ, after removing all queues first
224 979482ce Georgios Gousios
    """
225 979482ce Georgios Gousios
    purge_queues()
226 979482ce Georgios Gousios
227 979482ce Georgios Gousios
    conn = get_connection()
228 979482ce Georgios Gousios
    chan = conn.channel()
229 979482ce Georgios Gousios
230 979482ce Georgios Gousios
    print "Exchnages to be deleted: ", settings.EXCHANGES
231 979482ce Georgios Gousios
232 979482ce Georgios Gousios
    if not get_user_confirmation():
233 979482ce Georgios Gousios
        return
234 979482ce Georgios Gousios
235 979482ce Georgios Gousios
    for exchange in settings.EXCHANGES:
236 979482ce Georgios Gousios
        try:
237 979482ce Georgios Gousios
            chan.exchange_delete(exchange=exchange)
238 f30730c0 Georgios Gousios
        except amqp.exceptions.AMQPChannelException as e:
239 f30730c0 Georgios Gousios
            print e.amqp_reply_code, " ", e.amqp_reply_text
240 979482ce Georgios Gousios
241 8861126f Georgios Gousios
    chan.connection.close()
242 f30730c0 Georgios Gousios
243 c183005e Georgios Gousios
244 979482ce Georgios Gousios
def drain_queue(queue):
245 979482ce Georgios Gousios
    """
246 979482ce Georgios Gousios
        Strip a (declared) queue from all outstanding messages
247 979482ce Georgios Gousios
    """
248 979482ce Georgios Gousios
    if not queue:
249 979482ce Georgios Gousios
        return
250 979482ce Georgios Gousios
251 979482ce Georgios Gousios
    if not queue in settings.QUEUES:
252 979482ce Georgios Gousios
        print "Queue %s not configured" % queue
253 979482ce Georgios Gousios
        return
254 979482ce Georgios Gousios
255 979482ce Georgios Gousios
    print "Queue to be drained: %s" % queue
256 979482ce Georgios Gousios
257 979482ce Georgios Gousios
    if not get_user_confirmation():
258 979482ce Georgios Gousios
        return
259 979482ce Georgios Gousios
    conn = get_connection()
260 979482ce Georgios Gousios
    chan = conn.channel()
261 979482ce Georgios Gousios
262 e6209aa2 Georgios Gousios
    # Register a temporary queue binding
263 e6209aa2 Georgios Gousios
    for binding in settings.BINDINGS:
264 e6209aa2 Georgios Gousios
        if binding[0] == queue:
265 e6209aa2 Georgios Gousios
            exch = binding[1]
266 e6209aa2 Georgios Gousios
267 e6209aa2 Georgios Gousios
    if not exch:
268 e6209aa2 Georgios Gousios
        print "Queue not bound to any exchange: %s" % queue
269 e6209aa2 Georgios Gousios
        return
270 e6209aa2 Georgios Gousios
271 e6209aa2 Georgios Gousios
    chan.queue_bind(queue=queue, exchange=exch,routing_key='#')
272 e6209aa2 Georgios Gousios
    tag = chan.basic_consume(queue=queue, callback=callbacks.dummy_proc)
273 e6209aa2 Georgios Gousios
274 e6209aa2 Georgios Gousios
    print "Queue draining about to start, hit Ctrl+c when done"
275 e6209aa2 Georgios Gousios
    time.sleep(2)
276 e6209aa2 Georgios Gousios
    print "Queue draining starting"
277 e6209aa2 Georgios Gousios
278 e6209aa2 Georgios Gousios
    signal(SIGTERM, _exit_handler)
279 e6209aa2 Georgios Gousios
    signal(SIGINT, _exit_handler)
280 e6209aa2 Georgios Gousios
281 c626e1d0 Georgios Gousios
    num_processed = 0
282 e6209aa2 Georgios Gousios
    while True:
283 e6209aa2 Georgios Gousios
        chan.wait()
284 c626e1d0 Georgios Gousios
        num_processed += 1
285 c626e1d0 Georgios Gousios
        sys.stderr.write("Ignored %d messages\r" % num_processed)
286 c626e1d0 Georgios Gousios
287 e6209aa2 Georgios Gousios
    chan.basic_cancel(tag)
288 979482ce Georgios Gousios
    chan.connection.close()
289 979482ce Georgios Gousios
290 95aee02c Vangelis Koukis
291 979482ce Georgios Gousios
def get_connection():
292 979482ce Georgios Gousios
    conn = amqp.Connection( host=settings.RABBIT_HOST,
293 979482ce Georgios Gousios
                        userid=settings.RABBIT_USERNAME,
294 979482ce Georgios Gousios
                        password=settings.RABBIT_PASSWORD,
295 979482ce Georgios Gousios
                        virtual_host=settings.RABBIT_VHOST)
296 979482ce Georgios Gousios
    return conn
297 979482ce Georgios Gousios
298 95aee02c Vangelis Koukis
299 979482ce Georgios Gousios
def get_user_confirmation():
300 979482ce Georgios Gousios
    ans = raw_input("Are you sure (N/y):")
301 979482ce Georgios Gousios
302 979482ce Georgios Gousios
    if not ans:
303 979482ce Georgios Gousios
        return False
304 979482ce Georgios Gousios
    if ans not in ['Y', 'y']:
305 979482ce Georgios Gousios
        return False
306 979482ce Georgios Gousios
    return True
307 979482ce Georgios Gousios
308 979482ce Georgios Gousios
309 57d0082a Georgios Gousios
def debug_mode():
310 57d0082a Georgios Gousios
    disp = Dispatcher(debug = True)
311 838239fa Georgios Gousios
    signal(SIGINT, _exit_handler)
312 838239fa Georgios Gousios
    signal(SIGTERM, _exit_handler)
313 838239fa Georgios Gousios
314 838239fa Georgios Gousios
    disp.wait()
315 838239fa Georgios Gousios
316 838239fa Georgios Gousios
317 78e2d194 Georgios Gousios
def main():
318 8861126f Georgios Gousios
    global children, logger
319 78e2d194 Georgios Gousios
    (opts, args) = parse_arguments(sys.argv[1:])
320 78e2d194 Georgios Gousios
321 f13691b3 Georgios Gousios
    logger = log.get_logger("synnefo.dispatcher")
322 8861126f Georgios Gousios
323 c183005e Georgios Gousios
    # Special case for the clean up queues action
324 979482ce Georgios Gousios
    if opts.purge_queues:
325 979482ce Georgios Gousios
        purge_queues()
326 979482ce Georgios Gousios
        return
327 979482ce Georgios Gousios
328 979482ce Georgios Gousios
    # Special case for the clean up exch action
329 979482ce Georgios Gousios
    if opts.purge_exchanges:
330 979482ce Georgios Gousios
        purge_exchanges()
331 979482ce Georgios Gousios
        return
332 979482ce Georgios Gousios
333 d5470cdd Georgios Gousios
    if opts.drain_queue:
334 d5470cdd Georgios Gousios
        drain_queue(opts.drain_queue)
335 f30730c0 Georgios Gousios
        return
336 f30730c0 Georgios Gousios
337 838239fa Georgios Gousios
    # Debug mode, process messages without spawning workers
338 838239fa Georgios Gousios
    if opts.debug:
339 57d0082a Georgios Gousios
        debug_mode()
340 838239fa Georgios Gousios
        return
341 838239fa Georgios Gousios
342 57d0082a Georgios Gousios
    # Become a daemon
343 57d0082a Georgios Gousios
    daemon_context = daemon.DaemonContext(
344 4dc0b46a Georgios Gousios
        stdout=sys.stdout,
345 4dc0b46a Georgios Gousios
        stderr=sys.stderr,
346 4dc0b46a Georgios Gousios
        umask=022)
347 4dc0b46a Georgios Gousios
348 57d0082a Georgios Gousios
    daemon_context.open()
349 de081774 Georgios Gousios
350 4ed2e471 Georgios Gousios
    # Create pidfile. Take care of differences between python-daemon versions.
351 4ed2e471 Georgios Gousios
    try:
352 4ed2e471 Georgios Gousios
        pidf = pidfile.TimeoutPIDLockFile(opts.pid_file, 10)
353 4ed2e471 Georgios Gousios
    except:
354 4ed2e471 Georgios Gousios
        pidf = pidlockfile.TimeoutPIDLockFile(opts.pid_file, 10)
355 4ed2e471 Georgios Gousios
356 de081774 Georgios Gousios
    pidf.acquire()
357 de081774 Georgios Gousios
358 57d0082a Georgios Gousios
    logger.info("Became a daemon")
359 57d0082a Georgios Gousios
360 c183005e Georgios Gousios
    # Fork workers
361 8861126f Georgios Gousios
    children = []
362 8861126f Georgios Gousios
363 8861126f Georgios Gousios
    i = 0
364 8861126f Georgios Gousios
    while i < opts.workers:
365 8861126f Georgios Gousios
        newpid = os.fork()
366 8861126f Georgios Gousios
367 8861126f Georgios Gousios
        if newpid == 0:
368 4dc0b46a Georgios Gousios
            signal(SIGINT,  _exit_handler)
369 c183005e Georgios Gousios
            signal(SIGTERM, _exit_handler)
370 57d0082a Georgios Gousios
            child(sys.argv[1:])
371 4dc0b46a Georgios Gousios
            sys.exit(1)
372 8861126f Georgios Gousios
        else:
373 8861126f Georgios Gousios
            pids = (os.getpid(), newpid)
374 8861126f Georgios Gousios
            logger.debug("%d, forked child: %d" % pids)
375 8861126f Georgios Gousios
            children.append(pids[1])
376 8861126f Georgios Gousios
        i += 1
377 8861126f Georgios Gousios
378 8d8ea051 Georgios Gousios
    # Catch signals to ensure graceful shutdown
379 c183005e Georgios Gousios
    signal(SIGINT,  _parent_handler)
380 c183005e Georgios Gousios
    signal(SIGTERM, _parent_handler)
381 8861126f Georgios Gousios
382 57d0082a Georgios Gousios
    # Wait for all children processes to die, one by one
383 de081774 Georgios Gousios
    try :
384 de081774 Georgios Gousios
        for pid in children:
385 de081774 Georgios Gousios
            try:
386 de081774 Georgios Gousios
                os.waitpid(pid, 0)
387 de081774 Georgios Gousios
            except Exception:
388 de081774 Georgios Gousios
                pass
389 de081774 Georgios Gousios
    finally:
390 de081774 Georgios Gousios
        pidf.release()
391 c183005e Georgios Gousios
392 d08a5f6f Vangelis Koukis
if __name__ == "__main__":
393 d08a5f6f Vangelis Koukis
    sys.exit(main())
394 d08a5f6f Vangelis Koukis
395 8d8ea051 Georgios Gousios
# vim: set sta sts=4 shiftwidth=4 sw=4 et ai :