Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / dispatcher.py @ 44e2c577

History | View | Annotate | Download (12.9 kB)

1 d08a5f6f Vangelis Koukis
#!/usr/bin/env python
2 48130e66 Georgios Gousios
# Copyright 2011 GRNET S.A. All rights reserved.
3 87ace70f Vassilios Karakoidas
#
4 48130e66 Georgios Gousios
# Redistribution and use in source and binary forms, with or without
5 48130e66 Georgios Gousios
# modification, are permitted provided that the following conditions
6 48130e66 Georgios Gousios
# are met:
7 7bd50624 Vassilios Karakoidas
#
8 48130e66 Georgios Gousios
#   1. Redistributions of source code must retain the above copyright
9 48130e66 Georgios Gousios
#      notice, this list of conditions and the following disclaimer.
10 48130e66 Georgios Gousios
#
11 48130e66 Georgios Gousios
#  2. Redistributions in binary form must reproduce the above copyright
12 48130e66 Georgios Gousios
#     notice, this list of conditions and the following disclaimer in the
13 48130e66 Georgios Gousios
#     documentation and/or other materials provided with the distribution.
14 48130e66 Georgios Gousios
#
15 48130e66 Georgios Gousios
# THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
16 48130e66 Georgios Gousios
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17 48130e66 Georgios Gousios
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18 48130e66 Georgios Gousios
# ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
19 48130e66 Georgios Gousios
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
20 48130e66 Georgios Gousios
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
21 48130e66 Georgios Gousios
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
22 48130e66 Georgios Gousios
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
23 48130e66 Georgios Gousios
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
24 48130e66 Georgios Gousios
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
25 48130e66 Georgios Gousios
# SUCH DAMAGE.
26 48130e66 Georgios Gousios
#
27 48130e66 Georgios Gousios
# The views and conclusions contained in the software and documentation are
28 48130e66 Georgios Gousios
# those of the authors and should not be interpreted as representing official
29 48130e66 Georgios Gousios
# policies, either expressed or implied, of GRNET S.A.
30 48130e66 Georgios Gousios
31 48130e66 Georgios Gousios
32 e6209aa2 Georgios Gousios
""" Message queue setup, dispatch and admin
33 c183005e Georgios Gousios

34 2cd99e7a Georgios Gousios
This program sets up connections to the queues configured in settings.py
35 2cd99e7a Georgios Gousios
and implements the message wait and dispatch loops. Actual messages are
36 2cd99e7a Georgios Gousios
handled in the dispatched functions.
37 fcbc5bb3 Vassilios Karakoidas

38 d08a5f6f Vangelis Koukis
"""
39 d08a5f6f Vangelis Koukis
from django.core.management import setup_environ
40 c99fe4c7 Vassilios Karakoidas
41 cf2a3529 Christos Stavrakakis
# Fix path to import synnefo settings
42 d08a5f6f Vangelis Koukis
import sys
43 86221fd5 Panos Louridas
import os
44 86221fd5 Panos Louridas
path = os.path.normpath(os.path.join(os.getcwd(), '..'))
45 86221fd5 Panos Louridas
sys.path.append(path)
46 d08a5f6f Vangelis Koukis
47 de470b1e Kostas Papadimitriou
from synnefo import settings
48 d08a5f6f Vangelis Koukis
setup_environ(settings)
49 d08a5f6f Vangelis Koukis
50 9e98ba3c Giorgos Verigakis
import logging
51 8d8ea051 Georgios Gousios
import time
52 4ed2e471 Georgios Gousios
53 cf2a3529 Christos Stavrakakis
import daemon.runner
54 cf2a3529 Christos Stavrakakis
import daemon.daemon
55 cf2a3529 Christos Stavrakakis
from lockfile import LockTimeout
56 cf2a3529 Christos Stavrakakis
from signal import signal, SIGINT, SIGTERM
57 c4e55622 Christos Stavrakakis
58 4ed2e471 Georgios Gousios
# Take care of differences between python-daemon versions.
59 4ed2e471 Georgios Gousios
try:
60 cf2a3529 Christos Stavrakakis
    from daemon import pidfile as pidlockfile
61 4ed2e471 Georgios Gousios
except:
62 4ed2e471 Georgios Gousios
    from daemon import pidlockfile
63 8d8ea051 Georgios Gousios
64 cf2a3529 Christos Stavrakakis
from synnefo.lib.amqp import AMQPClient
65 9cb903f9 Vangelis Koukis
from synnefo.logic import callbacks
66 9e98ba3c Giorgos Verigakis
from synnefo.util.dictconfig import dictConfig
67 9e98ba3c Giorgos Verigakis
68 9e98ba3c Giorgos Verigakis
69 9e98ba3c Giorgos Verigakis
log = logging.getLogger()
70 9e98ba3c Giorgos Verigakis
71 c99fe4c7 Vassilios Karakoidas
72 698d0666 Georgios Gousios
# Queue names
73 698d0666 Georgios Gousios
QUEUES = []
74 698d0666 Georgios Gousios
75 698d0666 Georgios Gousios
# Queue bindings to exchanges
76 698d0666 Georgios Gousios
BINDINGS = []
77 698d0666 Georgios Gousios
78 2bf8d695 Vangelis Koukis
79 78e2d194 Georgios Gousios
class Dispatcher:
80 5d081749 Georgios Gousios
    debug = False
81 c4e55622 Christos Stavrakakis
    client_promises = []
82 78e2d194 Georgios Gousios
83 2bf8d695 Vangelis Koukis
    def __init__(self, debug=False):
84 5d081749 Georgios Gousios
        self.debug = debug
85 5d081749 Georgios Gousios
        self._init()
86 da102335 Georgios Gousios
87 78e2d194 Georgios Gousios
    def wait(self):
88 c4e55622 Christos Stavrakakis
        log.info("Waiting for messages..")
89 78e2d194 Georgios Gousios
        while True:
90 78e2d194 Georgios Gousios
            try:
91 c4e55622 Christos Stavrakakis
                self.client.basic_wait()
92 78e2d194 Georgios Gousios
            except SystemExit:
93 78e2d194 Georgios Gousios
                break
94 c4e55622 Christos Stavrakakis
            except Exception as e:
95 c4e55622 Christos Stavrakakis
                log.exception("Caught unexpected exception: %s", e)
96 c4e55622 Christos Stavrakakis
97 c4e55622 Christos Stavrakakis
        self.client.basic_cancel()
98 c4e55622 Christos Stavrakakis
        self.client.close()
99 78e2d194 Georgios Gousios
100 5d081749 Georgios Gousios
    def _init(self):
101 698d0666 Georgios Gousios
        global QUEUES, BINDINGS
102 9e98ba3c Giorgos Verigakis
        log.info("Initializing")
103 226f086a Georgios Gousios
104 c4e55622 Christos Stavrakakis
        self.client = AMQPClient()
105 c4e55622 Christos Stavrakakis
        # Connect to AMQP host
106 c4e55622 Christos Stavrakakis
        self.client.connect()
107 78e2d194 Georgios Gousios
108 c183005e Georgios Gousios
        # Declare queues and exchanges
109 f30730c0 Georgios Gousios
        for exchange in settings.EXCHANGES:
110 db400d82 Christos Stavrakakis
            self.client.exchange_declare(exchange=exchange,
111 db400d82 Christos Stavrakakis
                                         type="topic")
112 f30730c0 Georgios Gousios
113 226f086a Georgios Gousios
        for queue in QUEUES:
114 c4e55622 Christos Stavrakakis
            # Queues are mirrored to all RabbitMQ brokers
115 db400d82 Christos Stavrakakis
            self.client.queue_declare(queue=queue, mirrored=True)
116 78e2d194 Georgios Gousios
117 226f086a Georgios Gousios
        bindings = BINDINGS
118 78e2d194 Georgios Gousios
119 c183005e Georgios Gousios
        # Bind queues to handler methods
120 5d081749 Georgios Gousios
        for binding in bindings:
121 78e2d194 Georgios Gousios
            try:
122 9cb903f9 Vangelis Koukis
                callback = getattr(callbacks, binding[3])
123 23c84263 Georgios Gousios
            except AttributeError:
124 9e98ba3c Giorgos Verigakis
                log.error("Cannot find callback %s", binding[3])
125 e6f5bb10 Vangelis Koukis
                raise SystemExit(1)
126 8d8ea051 Georgios Gousios
127 c4e55622 Christos Stavrakakis
            self.client.queue_bind(queue=binding[0], exchange=binding[1],
128 c4e55622 Christos Stavrakakis
                                   routing_key=binding[2])
129 c4e55622 Christos Stavrakakis
130 c4e55622 Christos Stavrakakis
            consume_promise = self.client.basic_consume(queue=binding[0],
131 c4e55622 Christos Stavrakakis
                                                        callback=callback)
132 c4e55622 Christos Stavrakakis
133 9e98ba3c Giorgos Verigakis
            log.debug("Binding %s(%s) to queue %s with handler %s",
134 c4e55622 Christos Stavrakakis
                      binding[1], binding[2], binding[0], binding[3])
135 c4e55622 Christos Stavrakakis
            self.client_promises.append(consume_promise)
136 8d8ea051 Georgios Gousios
137 c183005e Georgios Gousios
138 698d0666 Georgios Gousios
def _init_queues():
139 698d0666 Georgios Gousios
    global QUEUES, BINDINGS
140 698d0666 Georgios Gousios
141 698d0666 Georgios Gousios
    # Queue declarations
142 698d0666 Georgios Gousios
    prefix = settings.BACKEND_PREFIX_ID.split('-')[0]
143 698d0666 Georgios Gousios
144 698d0666 Georgios Gousios
    QUEUE_GANETI_EVENTS_OP = "%s-events-op" % prefix
145 a17a8e98 Christos Stavrakakis
    QUEUE_GANETI_EVENTS_NETWORK = "%s-events-network" % prefix
146 698d0666 Georgios Gousios
    QUEUE_GANETI_EVENTS_NET = "%s-events-net" % prefix
147 9068cd85 Georgios Gousios
    QUEUE_GANETI_BUILD_PROGR = "%s-events-progress" % prefix
148 698d0666 Georgios Gousios
    QUEUE_RECONC = "%s-reconciliation" % prefix
149 698d0666 Georgios Gousios
    if settings.DEBUG is True:
150 b47b895d Christos Stavrakakis
        QUEUE_DEBUG = "%s-debug" % prefix  # Debug queue, retrieves all messages
151 698d0666 Georgios Gousios
152 a17a8e98 Christos Stavrakakis
    QUEUES = (QUEUE_GANETI_EVENTS_OP, QUEUE_GANETI_EVENTS_NETWORK, QUEUE_GANETI_EVENTS_NET, QUEUE_RECONC,
153 fd56d250 Giorgos Verigakis
              QUEUE_GANETI_BUILD_PROGR)
154 698d0666 Georgios Gousios
155 698d0666 Georgios Gousios
    # notifications of type "ganeti-op-status"
156 2bf8d695 Vangelis Koukis
    DB_HANDLER_KEY_OP = 'ganeti.%s.event.op' % prefix
157 a17a8e98 Christos Stavrakakis
    # notifications of type "ganeti-network-status"
158 a17a8e98 Christos Stavrakakis
    DB_HANDLER_KEY_NETWORK = 'ganeti.%s.event.network' % prefix
159 698d0666 Georgios Gousios
    # notifications of type "ganeti-net-status"
160 2bf8d695 Vangelis Koukis
    DB_HANDLER_KEY_NET = 'ganeti.%s.event.net' % prefix
161 e6f5bb10 Vangelis Koukis
    # notifications of type "ganeti-create-progress"
162 e6f5bb10 Vangelis Koukis
    BUILD_MONITOR_HANDLER = 'ganeti.%s.event.progress' % prefix
163 0230cd3a Vangelis Koukis
    # reconciliation
164 633d9cfa Georgios Gousios
    RECONC_HANDLER = 'reconciliation.%s.*' % prefix
165 698d0666 Georgios Gousios
166 698d0666 Georgios Gousios
    BINDINGS = [
167 9068cd85 Georgios Gousios
    # Queue                   # Exchange                # RouteKey              # Handler
168 9068cd85 Georgios Gousios
    (QUEUE_GANETI_EVENTS_OP,  settings.EXCHANGE_GANETI, DB_HANDLER_KEY_OP,      'update_db'),
169 a17a8e98 Christos Stavrakakis
    (QUEUE_GANETI_EVENTS_NETWORK, settings.EXCHANGE_GANETI, DB_HANDLER_KEY_NETWORK, 'update_network'),
170 9068cd85 Georgios Gousios
    (QUEUE_GANETI_EVENTS_NET, settings.EXCHANGE_GANETI, DB_HANDLER_KEY_NET,     'update_net'),
171 cf2a3529 Christos Stavrakakis
    (QUEUE_GANETI_BUILD_PROGR, settings.EXCHANGE_GANETI, BUILD_MONITOR_HANDLER,  'update_build_progress'),
172 698d0666 Georgios Gousios
    ]
173 698d0666 Georgios Gousios
174 698d0666 Georgios Gousios
    if settings.DEBUG is True:
175 698d0666 Georgios Gousios
        BINDINGS += [
176 698d0666 Georgios Gousios
            # Queue       # Exchange          # RouteKey  # Handler
177 698d0666 Georgios Gousios
            (QUEUE_DEBUG, settings.EXCHANGE_GANETI, '#',  'dummy_proc'),
178 698d0666 Georgios Gousios
            (QUEUE_DEBUG, settings.EXCHANGE_CRON,   '#',  'dummy_proc'),
179 698d0666 Georgios Gousios
            (QUEUE_DEBUG, settings.EXCHANGE_API,    '#',  'dummy_proc'),
180 698d0666 Georgios Gousios
        ]
181 698d0666 Georgios Gousios
        QUEUES += (QUEUE_DEBUG,)
182 698d0666 Georgios Gousios
183 698d0666 Georgios Gousios
184 c183005e Georgios Gousios
def _exit_handler(signum, frame):
185 d28244af Vangelis Koukis
    """"Catch exit signal in children processes"""
186 9e98ba3c Giorgos Verigakis
    log.info("Caught signal %d, will raise SystemExit", signum)
187 8d8ea051 Georgios Gousios
    raise SystemExit
188 8d8ea051 Georgios Gousios
189 c183005e Georgios Gousios
190 c183005e Georgios Gousios
def _parent_handler(signum, frame):
191 c183005e Georgios Gousios
    """"Catch exit signal in parent process and forward it to children."""
192 9e98ba3c Giorgos Verigakis
    global children
193 9e98ba3c Giorgos Verigakis
    log.info("Caught signal %d, sending SIGTERM to children %s",
194 cf2a3529 Christos Stavrakakis
             signum, children)
195 8861126f Georgios Gousios
    [os.kill(pid, SIGTERM) for pid in children]
196 8861126f Georgios Gousios
197 c183005e Georgios Gousios
198 57d0082a Georgios Gousios
def child(cmdline):
199 c183005e Georgios Gousios
    """The context of the child process"""
200 c183005e Georgios Gousios
201 c183005e Georgios Gousios
    # Cmd line argument parsing
202 78e2d194 Georgios Gousios
    (opts, args) = parse_arguments(cmdline)
203 d28244af Vangelis Koukis
    disp = Dispatcher(debug=opts.debug)
204 78e2d194 Georgios Gousios
205 c183005e Georgios Gousios
    # Start the event loop
206 2cd99e7a Georgios Gousios
    disp.wait()
207 78e2d194 Georgios Gousios
208 c183005e Georgios Gousios
209 78e2d194 Georgios Gousios
def parse_arguments(args):
210 78e2d194 Georgios Gousios
    from optparse import OptionParser
211 78e2d194 Georgios Gousios
212 cf2a3529 Christos Stavrakakis
    default_pid_file = os.path.join("var", "run", "synnefo", "dispatcher.pid")
213 78e2d194 Georgios Gousios
    parser = OptionParser()
214 c183005e Georgios Gousios
    parser.add_option("-d", "--debug", action="store_true", default=False,
215 2cd99e7a Georgios Gousios
                      dest="debug", help="Enable debug mode")
216 e6209aa2 Georgios Gousios
    parser.add_option("-w", "--workers", default=2, dest="workers",
217 e6209aa2 Georgios Gousios
                      help="Number of workers to spawn", type="int")
218 3d975c75 Kostas Papadimitriou
    parser.add_option("-p", "--pid-file", dest="pid_file",
219 3d975c75 Kostas Papadimitriou
                      default=default_pid_file,
220 3d975c75 Kostas Papadimitriou
                      help="Save PID to file (default: %s)" % default_pid_file)
221 979482ce Georgios Gousios
    parser.add_option("--purge-queues", action="store_true",
222 979482ce Georgios Gousios
                      default=False, dest="purge_queues",
223 57d0082a Georgios Gousios
                      help="Remove all declared queues (DANGEROUS!)")
224 979482ce Georgios Gousios
    parser.add_option("--purge-exchanges", action="store_true",
225 979482ce Georgios Gousios
                      default=False, dest="purge_exchanges",
226 979482ce Georgios Gousios
                      help="Remove all exchanges. Implies deleting all queues \
227 979482ce Georgios Gousios
                           first (DANGEROUS!)")
228 d5470cdd Georgios Gousios
    parser.add_option("--drain-queue", dest="drain_queue",
229 e6209aa2 Georgios Gousios
                      help="Strips a queue from all outstanding messages")
230 de081774 Georgios Gousios
231 78e2d194 Georgios Gousios
    return parser.parse_args(args)
232 78e2d194 Georgios Gousios
233 f30730c0 Georgios Gousios
234 d28244af Vangelis Koukis
def purge_queues():
235 979482ce Georgios Gousios
    """
236 979482ce Georgios Gousios
        Delete declared queues from RabbitMQ. Use with care!
237 979482ce Georgios Gousios
    """
238 698d0666 Georgios Gousios
    global QUEUES, BINDINGS
239 c4e55622 Christos Stavrakakis
    client = AMQPClient()
240 c4e55622 Christos Stavrakakis
    client.connect()
241 f30730c0 Georgios Gousios
242 698d0666 Georgios Gousios
    print "Queues to be deleted: ", QUEUES
243 f30730c0 Georgios Gousios
244 979482ce Georgios Gousios
    if not get_user_confirmation():
245 f30730c0 Georgios Gousios
        return
246 f30730c0 Georgios Gousios
247 698d0666 Georgios Gousios
    for queue in QUEUES:
248 c4e55622 Christos Stavrakakis
        result = client.queue_delete(queue=queue)
249 c4e55622 Christos Stavrakakis
        print "Deleting queue %s. Result: %s" % (queue, result)
250 979482ce Georgios Gousios
251 c4e55622 Christos Stavrakakis
    client.close()
252 979482ce Georgios Gousios
253 979482ce Georgios Gousios
254 979482ce Georgios Gousios
def purge_exchanges():
255 e6f5bb10 Vangelis Koukis
    """Delete declared exchanges from RabbitMQ, after removing all queues"""
256 698d0666 Georgios Gousios
    global QUEUES, BINDINGS
257 979482ce Georgios Gousios
    purge_queues()
258 979482ce Georgios Gousios
259 c4e55622 Christos Stavrakakis
    client = AMQPClient()
260 c4e55622 Christos Stavrakakis
    client.connect()
261 979482ce Georgios Gousios
262 e6f5bb10 Vangelis Koukis
    print "Exchanges to be deleted: ", settings.EXCHANGES
263 979482ce Georgios Gousios
264 979482ce Georgios Gousios
    if not get_user_confirmation():
265 979482ce Georgios Gousios
        return
266 979482ce Georgios Gousios
267 979482ce Georgios Gousios
    for exchange in settings.EXCHANGES:
268 c4e55622 Christos Stavrakakis
        result = client.exchange_delete(exchange=exchange)
269 c4e55622 Christos Stavrakakis
        print "Deleting exchange %s. Result: %s" % (exchange, result)
270 979482ce Georgios Gousios
271 c4e55622 Christos Stavrakakis
    client.close()
272 f30730c0 Georgios Gousios
273 c183005e Georgios Gousios
274 979482ce Georgios Gousios
def drain_queue(queue):
275 e6f5bb10 Vangelis Koukis
    """Strip a (declared) queue from all outstanding messages"""
276 698d0666 Georgios Gousios
    global QUEUES, BINDINGS
277 979482ce Georgios Gousios
    if not queue:
278 979482ce Georgios Gousios
        return
279 979482ce Georgios Gousios
280 698d0666 Georgios Gousios
    if not queue in QUEUES:
281 979482ce Georgios Gousios
        print "Queue %s not configured" % queue
282 979482ce Georgios Gousios
        return
283 979482ce Georgios Gousios
284 979482ce Georgios Gousios
    print "Queue to be drained: %s" % queue
285 979482ce Georgios Gousios
286 979482ce Georgios Gousios
    if not get_user_confirmation():
287 979482ce Georgios Gousios
        return
288 c4e55622 Christos Stavrakakis
289 c4e55622 Christos Stavrakakis
    client = AMQPClient()
290 c4e55622 Christos Stavrakakis
    client.connect()
291 979482ce Georgios Gousios
292 e6209aa2 Georgios Gousios
    # Register a temporary queue binding
293 698d0666 Georgios Gousios
    for binding in BINDINGS:
294 e6209aa2 Georgios Gousios
        if binding[0] == queue:
295 e6209aa2 Georgios Gousios
            exch = binding[1]
296 e6209aa2 Georgios Gousios
297 c4e55622 Christos Stavrakakis
    tag = client.basic_consume(queue=queue, callback=callbacks.dummy_proc)
298 e6209aa2 Georgios Gousios
299 e6209aa2 Georgios Gousios
    print "Queue draining about to start, hit Ctrl+c when done"
300 e6209aa2 Georgios Gousios
    time.sleep(2)
301 e6209aa2 Georgios Gousios
    print "Queue draining starting"
302 e6209aa2 Georgios Gousios
303 e6209aa2 Georgios Gousios
    signal(SIGTERM, _exit_handler)
304 e6209aa2 Georgios Gousios
    signal(SIGINT, _exit_handler)
305 e6209aa2 Georgios Gousios
306 c626e1d0 Georgios Gousios
    num_processed = 0
307 e6209aa2 Georgios Gousios
    while True:
308 c4e55622 Christos Stavrakakis
        client.basic_wait()
309 c626e1d0 Georgios Gousios
        num_processed += 1
310 c626e1d0 Georgios Gousios
        sys.stderr.write("Ignored %d messages\r" % num_processed)
311 c626e1d0 Georgios Gousios
312 c4e55622 Christos Stavrakakis
    client.basic_cancel(tag)
313 c4e55622 Christos Stavrakakis
    client.close()
314 95aee02c Vangelis Koukis
315 979482ce Georgios Gousios
316 95aee02c Vangelis Koukis
317 979482ce Georgios Gousios
def get_user_confirmation():
318 979482ce Georgios Gousios
    ans = raw_input("Are you sure (N/y):")
319 979482ce Georgios Gousios
320 979482ce Georgios Gousios
    if not ans:
321 979482ce Georgios Gousios
        return False
322 979482ce Georgios Gousios
    if ans not in ['Y', 'y']:
323 979482ce Georgios Gousios
        return False
324 979482ce Georgios Gousios
    return True
325 979482ce Georgios Gousios
326 979482ce Georgios Gousios
327 57d0082a Georgios Gousios
def debug_mode():
328 2bf8d695 Vangelis Koukis
    disp = Dispatcher(debug=True)
329 838239fa Georgios Gousios
    signal(SIGINT, _exit_handler)
330 838239fa Georgios Gousios
    signal(SIGTERM, _exit_handler)
331 838239fa Georgios Gousios
332 838239fa Georgios Gousios
    disp.wait()
333 838239fa Georgios Gousios
334 838239fa Georgios Gousios
335 d28244af Vangelis Koukis
def daemon_mode(opts):
336 9e98ba3c Giorgos Verigakis
    global children
337 698d0666 Georgios Gousios
338 d28244af Vangelis Koukis
    # Create pidfile,
339 cf2a3529 Christos Stavrakakis
    pidf = pidlockfile.TimeoutPIDLockFile(opts.pid_file, 10)
340 4ed2e471 Georgios Gousios
341 cf2a3529 Christos Stavrakakis
    if daemon.runner.is_pidfile_stale(pidf):
342 cf2a3529 Christos Stavrakakis
        log.warning("Removing stale PID lock file %s", pidf.path)
343 cf2a3529 Christos Stavrakakis
        pidf.break_lock()
344 cf2a3529 Christos Stavrakakis
345 cf2a3529 Christos Stavrakakis
    try:
346 cf2a3529 Christos Stavrakakis
        pidf.acquire()
347 cf2a3529 Christos Stavrakakis
    except (pidlockfile.AlreadyLocked, LockTimeout):
348 cf2a3529 Christos Stavrakakis
        log.critical("Failed to lock pidfile %s, another instance running?",
349 cf2a3529 Christos Stavrakakis
                     pidf.path)
350 cf2a3529 Christos Stavrakakis
        sys.exit(1)
351 de081774 Georgios Gousios
352 9e98ba3c Giorgos Verigakis
    log.info("Became a daemon")
353 57d0082a Georgios Gousios
354 c183005e Georgios Gousios
    # Fork workers
355 8861126f Georgios Gousios
    children = []
356 8861126f Georgios Gousios
357 8861126f Georgios Gousios
    i = 0
358 8861126f Georgios Gousios
    while i < opts.workers:
359 8861126f Georgios Gousios
        newpid = os.fork()
360 8861126f Georgios Gousios
361 8861126f Georgios Gousios
        if newpid == 0:
362 d28244af Vangelis Koukis
            signal(SIGINT, _exit_handler)
363 c183005e Georgios Gousios
            signal(SIGTERM, _exit_handler)
364 57d0082a Georgios Gousios
            child(sys.argv[1:])
365 4dc0b46a Georgios Gousios
            sys.exit(1)
366 8861126f Georgios Gousios
        else:
367 9e98ba3c Giorgos Verigakis
            log.debug("%d, forked child: %d", os.getpid(), newpid)
368 9e98ba3c Giorgos Verigakis
            children.append(newpid)
369 8861126f Georgios Gousios
        i += 1
370 8861126f Georgios Gousios
371 8d8ea051 Georgios Gousios
    # Catch signals to ensure graceful shutdown
372 d28244af Vangelis Koukis
    signal(SIGINT, _parent_handler)
373 c183005e Georgios Gousios
    signal(SIGTERM, _parent_handler)
374 8861126f Georgios Gousios
375 57d0082a Georgios Gousios
    # Wait for all children processes to die, one by one
376 d28244af Vangelis Koukis
    try:
377 de081774 Georgios Gousios
        for pid in children:
378 de081774 Georgios Gousios
            try:
379 de081774 Georgios Gousios
                os.waitpid(pid, 0)
380 de081774 Georgios Gousios
            except Exception:
381 de081774 Georgios Gousios
                pass
382 de081774 Georgios Gousios
    finally:
383 de081774 Georgios Gousios
        pidf.release()
384 c183005e Georgios Gousios
385 d28244af Vangelis Koukis
386 d28244af Vangelis Koukis
def main():
387 cf2a3529 Christos Stavrakakis
    (opts, args) = parse_arguments(sys.argv[1:])
388 cf2a3529 Christos Stavrakakis
389 3d975c75 Kostas Papadimitriou
    dictConfig(settings.DISPATCHER_LOGGING)
390 3d975c75 Kostas Papadimitriou
391 9e98ba3c Giorgos Verigakis
    global log
392 3d975c75 Kostas Papadimitriou
393 d28244af Vangelis Koukis
    # Init the global variables containing the queues
394 d28244af Vangelis Koukis
    _init_queues()
395 d28244af Vangelis Koukis
396 d28244af Vangelis Koukis
    # Special case for the clean up queues action
397 d28244af Vangelis Koukis
    if opts.purge_queues:
398 d28244af Vangelis Koukis
        purge_queues()
399 d28244af Vangelis Koukis
        return
400 d28244af Vangelis Koukis
401 d28244af Vangelis Koukis
    # Special case for the clean up exch action
402 d28244af Vangelis Koukis
    if opts.purge_exchanges:
403 d28244af Vangelis Koukis
        purge_exchanges()
404 d28244af Vangelis Koukis
        return
405 d28244af Vangelis Koukis
406 d28244af Vangelis Koukis
    if opts.drain_queue:
407 d28244af Vangelis Koukis
        drain_queue(opts.drain_queue)
408 d28244af Vangelis Koukis
        return
409 d28244af Vangelis Koukis
410 d28244af Vangelis Koukis
    # Debug mode, process messages without spawning workers
411 d28244af Vangelis Koukis
    if opts.debug:
412 d28244af Vangelis Koukis
        debug_mode()
413 d28244af Vangelis Koukis
        return
414 7c62bd54 Kostas Papadimitriou
415 9e98ba3c Giorgos Verigakis
    files_preserve = []
416 9e98ba3c Giorgos Verigakis
    for handler in log.handlers:
417 9e98ba3c Giorgos Verigakis
        stream = getattr(handler, 'stream')
418 9e98ba3c Giorgos Verigakis
        if stream and hasattr(stream, 'fileno'):
419 9e98ba3c Giorgos Verigakis
            files_preserve.append(handler.stream)
420 7c62bd54 Kostas Papadimitriou
421 cf2a3529 Christos Stavrakakis
    daemon_context = daemon.daemon.DaemonContext(
422 d28244af Vangelis Koukis
        files_preserve=files_preserve,
423 d28244af Vangelis Koukis
        umask=022)
424 7c62bd54 Kostas Papadimitriou
425 d28244af Vangelis Koukis
    daemon_context.open()
426 d28244af Vangelis Koukis
427 d28244af Vangelis Koukis
    # Catch every exception, make sure it gets logged properly
428 d28244af Vangelis Koukis
    try:
429 d28244af Vangelis Koukis
        daemon_mode(opts)
430 d28244af Vangelis Koukis
    except Exception:
431 9e98ba3c Giorgos Verigakis
        log.exception("Unknown error")
432 d28244af Vangelis Koukis
        raise
433 d28244af Vangelis Koukis
434 d28244af Vangelis Koukis
435 7c62bd54 Kostas Papadimitriou
if __name__ == "__main__":
436 3d975c75 Kostas Papadimitriou
    sys.exit(main())
437 7c62bd54 Kostas Papadimitriou
438 8d8ea051 Georgios Gousios
# vim: set sta sts=4 shiftwidth=4 sw=4 et ai :