Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / dispatcher.py @ 8ec69269

History | View | Annotate | Download (12.2 kB)

1 d08a5f6f Vangelis Koukis
#!/usr/bin/env python
2 48130e66 Georgios Gousios
# Copyright 2011 GRNET S.A. All rights reserved.
3 87ace70f Vassilios Karakoidas
#
4 48130e66 Georgios Gousios
# Redistribution and use in source and binary forms, with or without
5 48130e66 Georgios Gousios
# modification, are permitted provided that the following conditions
6 48130e66 Georgios Gousios
# are met:
7 7bd50624 Vassilios Karakoidas
#
8 48130e66 Georgios Gousios
#   1. Redistributions of source code must retain the above copyright
9 48130e66 Georgios Gousios
#      notice, this list of conditions and the following disclaimer.
10 48130e66 Georgios Gousios
#
11 48130e66 Georgios Gousios
#  2. Redistributions in binary form must reproduce the above copyright
12 48130e66 Georgios Gousios
#     notice, this list of conditions and the following disclaimer in the
13 48130e66 Georgios Gousios
#     documentation and/or other materials provided with the distribution.
14 48130e66 Georgios Gousios
#
15 48130e66 Georgios Gousios
# THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
16 48130e66 Georgios Gousios
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17 48130e66 Georgios Gousios
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18 48130e66 Georgios Gousios
# ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
19 48130e66 Georgios Gousios
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
20 48130e66 Georgios Gousios
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
21 48130e66 Georgios Gousios
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
22 48130e66 Georgios Gousios
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
23 48130e66 Georgios Gousios
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
24 48130e66 Georgios Gousios
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
25 48130e66 Georgios Gousios
# SUCH DAMAGE.
26 48130e66 Georgios Gousios
#
27 48130e66 Georgios Gousios
# The views and conclusions contained in the software and documentation are
28 48130e66 Georgios Gousios
# those of the authors and should not be interpreted as representing official
29 48130e66 Georgios Gousios
# policies, either expressed or implied, of GRNET S.A.
30 48130e66 Georgios Gousios
31 48130e66 Georgios Gousios
32 e6209aa2 Georgios Gousios
""" Message queue setup, dispatch and admin
33 c183005e Georgios Gousios

34 2cd99e7a Georgios Gousios
This program sets up connections to the queues configured in settings.py
35 2cd99e7a Georgios Gousios
and implements the message wait and dispatch loops. Actual messages are
36 2cd99e7a Georgios Gousios
handled in the dispatched functions.
37 fcbc5bb3 Vassilios Karakoidas

38 d08a5f6f Vangelis Koukis
"""
39 d08a5f6f Vangelis Koukis
from django.core.management import setup_environ
40 c99fe4c7 Vassilios Karakoidas
41 cf2a3529 Christos Stavrakakis
# Fix path to import synnefo settings
42 d08a5f6f Vangelis Koukis
import sys
43 86221fd5 Panos Louridas
import os
44 86221fd5 Panos Louridas
path = os.path.normpath(os.path.join(os.getcwd(), '..'))
45 86221fd5 Panos Louridas
sys.path.append(path)
46 de470b1e Kostas Papadimitriou
from synnefo import settings
47 d08a5f6f Vangelis Koukis
setup_environ(settings)
48 d08a5f6f Vangelis Koukis
49 9e98ba3c Giorgos Verigakis
import logging
50 8d8ea051 Georgios Gousios
import time
51 4ed2e471 Georgios Gousios
52 3f018af1 Christos Stavrakakis
import daemon
53 cf2a3529 Christos Stavrakakis
import daemon.runner
54 cf2a3529 Christos Stavrakakis
from lockfile import LockTimeout
55 4ed2e471 Georgios Gousios
# Take care of differences between python-daemon versions.
56 4ed2e471 Georgios Gousios
try:
57 cf2a3529 Christos Stavrakakis
    from daemon import pidfile as pidlockfile
58 4ed2e471 Georgios Gousios
except:
59 4ed2e471 Georgios Gousios
    from daemon import pidlockfile
60 6c9c95d8 Christos Stavrakakis
import setproctitle
61 8d8ea051 Georgios Gousios
62 cf2a3529 Christos Stavrakakis
from synnefo.lib.amqp import AMQPClient
63 9cb903f9 Vangelis Koukis
from synnefo.logic import callbacks
64 9e98ba3c Giorgos Verigakis
65 3f018af1 Christos Stavrakakis
from synnefo.util.dictconfig import dictConfig
66 3f018af1 Christos Stavrakakis
dictConfig(settings.DISPATCHER_LOGGING)
67 9e98ba3c Giorgos Verigakis
log = logging.getLogger()
68 9e98ba3c Giorgos Verigakis
69 698d0666 Georgios Gousios
# Queue names
70 698d0666 Georgios Gousios
QUEUES = []
71 698d0666 Georgios Gousios
72 698d0666 Georgios Gousios
# Queue bindings to exchanges
73 698d0666 Georgios Gousios
BINDINGS = []
74 698d0666 Georgios Gousios
75 2bf8d695 Vangelis Koukis
76 78e2d194 Georgios Gousios
class Dispatcher:
77 5d081749 Georgios Gousios
    debug = False
78 78e2d194 Georgios Gousios
79 2bf8d695 Vangelis Koukis
    def __init__(self, debug=False):
80 5d081749 Georgios Gousios
        self.debug = debug
81 5d081749 Georgios Gousios
        self._init()
82 da102335 Georgios Gousios
83 78e2d194 Georgios Gousios
    def wait(self):
84 c4e55622 Christos Stavrakakis
        log.info("Waiting for messages..")
85 78e2d194 Georgios Gousios
        while True:
86 78e2d194 Georgios Gousios
            try:
87 c4e55622 Christos Stavrakakis
                self.client.basic_wait()
88 78e2d194 Georgios Gousios
            except SystemExit:
89 78e2d194 Georgios Gousios
                break
90 c4e55622 Christos Stavrakakis
            except Exception as e:
91 c4e55622 Christos Stavrakakis
                log.exception("Caught unexpected exception: %s", e)
92 c4e55622 Christos Stavrakakis
93 c4e55622 Christos Stavrakakis
        self.client.basic_cancel()
94 c4e55622 Christos Stavrakakis
        self.client.close()
95 78e2d194 Georgios Gousios
96 5d081749 Georgios Gousios
    def _init(self):
97 698d0666 Georgios Gousios
        global QUEUES, BINDINGS
98 9e98ba3c Giorgos Verigakis
        log.info("Initializing")
99 226f086a Georgios Gousios
100 c4e55622 Christos Stavrakakis
        self.client = AMQPClient()
101 c4e55622 Christos Stavrakakis
        # Connect to AMQP host
102 c4e55622 Christos Stavrakakis
        self.client.connect()
103 78e2d194 Georgios Gousios
104 c183005e Georgios Gousios
        # Declare queues and exchanges
105 f30730c0 Georgios Gousios
        for exchange in settings.EXCHANGES:
106 db400d82 Christos Stavrakakis
            self.client.exchange_declare(exchange=exchange,
107 db400d82 Christos Stavrakakis
                                         type="topic")
108 f30730c0 Georgios Gousios
109 226f086a Georgios Gousios
        for queue in QUEUES:
110 c4e55622 Christos Stavrakakis
            # Queues are mirrored to all RabbitMQ brokers
111 db400d82 Christos Stavrakakis
            self.client.queue_declare(queue=queue, mirrored=True)
112 78e2d194 Georgios Gousios
113 226f086a Georgios Gousios
        bindings = BINDINGS
114 78e2d194 Georgios Gousios
115 c183005e Georgios Gousios
        # Bind queues to handler methods
116 5d081749 Georgios Gousios
        for binding in bindings:
117 78e2d194 Georgios Gousios
            try:
118 9cb903f9 Vangelis Koukis
                callback = getattr(callbacks, binding[3])
119 23c84263 Georgios Gousios
            except AttributeError:
120 9e98ba3c Giorgos Verigakis
                log.error("Cannot find callback %s", binding[3])
121 e6f5bb10 Vangelis Koukis
                raise SystemExit(1)
122 8d8ea051 Georgios Gousios
123 c4e55622 Christos Stavrakakis
            self.client.queue_bind(queue=binding[0], exchange=binding[1],
124 c4e55622 Christos Stavrakakis
                                   routing_key=binding[2])
125 c4e55622 Christos Stavrakakis
126 370f69ec Christos Stavrakakis
            self.client.basic_consume(queue=binding[0],
127 c4e55622 Christos Stavrakakis
                                                        callback=callback)
128 c4e55622 Christos Stavrakakis
129 9e98ba3c Giorgos Verigakis
            log.debug("Binding %s(%s) to queue %s with handler %s",
130 c4e55622 Christos Stavrakakis
                      binding[1], binding[2], binding[0], binding[3])
131 8d8ea051 Georgios Gousios
132 c183005e Georgios Gousios
133 698d0666 Georgios Gousios
def _init_queues():
134 698d0666 Georgios Gousios
    global QUEUES, BINDINGS
135 698d0666 Georgios Gousios
136 698d0666 Georgios Gousios
    # Queue declarations
137 698d0666 Georgios Gousios
    prefix = settings.BACKEND_PREFIX_ID.split('-')[0]
138 698d0666 Georgios Gousios
139 698d0666 Georgios Gousios
    QUEUE_GANETI_EVENTS_OP = "%s-events-op" % prefix
140 a17a8e98 Christos Stavrakakis
    QUEUE_GANETI_EVENTS_NETWORK = "%s-events-network" % prefix
141 698d0666 Georgios Gousios
    QUEUE_GANETI_EVENTS_NET = "%s-events-net" % prefix
142 9068cd85 Georgios Gousios
    QUEUE_GANETI_BUILD_PROGR = "%s-events-progress" % prefix
143 698d0666 Georgios Gousios
    QUEUE_RECONC = "%s-reconciliation" % prefix
144 698d0666 Georgios Gousios
    if settings.DEBUG is True:
145 b47b895d Christos Stavrakakis
        QUEUE_DEBUG = "%s-debug" % prefix  # Debug queue, retrieves all messages
146 698d0666 Georgios Gousios
147 a17a8e98 Christos Stavrakakis
    QUEUES = (QUEUE_GANETI_EVENTS_OP, QUEUE_GANETI_EVENTS_NETWORK, QUEUE_GANETI_EVENTS_NET, QUEUE_RECONC,
148 fd56d250 Giorgos Verigakis
              QUEUE_GANETI_BUILD_PROGR)
149 698d0666 Georgios Gousios
150 698d0666 Georgios Gousios
    # notifications of type "ganeti-op-status"
151 2bf8d695 Vangelis Koukis
    DB_HANDLER_KEY_OP = 'ganeti.%s.event.op' % prefix
152 a17a8e98 Christos Stavrakakis
    # notifications of type "ganeti-network-status"
153 a17a8e98 Christos Stavrakakis
    DB_HANDLER_KEY_NETWORK = 'ganeti.%s.event.network' % prefix
154 698d0666 Georgios Gousios
    # notifications of type "ganeti-net-status"
155 2bf8d695 Vangelis Koukis
    DB_HANDLER_KEY_NET = 'ganeti.%s.event.net' % prefix
156 e6f5bb10 Vangelis Koukis
    # notifications of type "ganeti-create-progress"
157 e6f5bb10 Vangelis Koukis
    BUILD_MONITOR_HANDLER = 'ganeti.%s.event.progress' % prefix
158 0230cd3a Vangelis Koukis
    # reconciliation
159 633d9cfa Georgios Gousios
    RECONC_HANDLER = 'reconciliation.%s.*' % prefix
160 698d0666 Georgios Gousios
161 698d0666 Georgios Gousios
    BINDINGS = [
162 9068cd85 Georgios Gousios
    # Queue                   # Exchange                # RouteKey              # Handler
163 9068cd85 Georgios Gousios
    (QUEUE_GANETI_EVENTS_OP,  settings.EXCHANGE_GANETI, DB_HANDLER_KEY_OP,      'update_db'),
164 a17a8e98 Christos Stavrakakis
    (QUEUE_GANETI_EVENTS_NETWORK, settings.EXCHANGE_GANETI, DB_HANDLER_KEY_NETWORK, 'update_network'),
165 9068cd85 Georgios Gousios
    (QUEUE_GANETI_EVENTS_NET, settings.EXCHANGE_GANETI, DB_HANDLER_KEY_NET,     'update_net'),
166 cf2a3529 Christos Stavrakakis
    (QUEUE_GANETI_BUILD_PROGR, settings.EXCHANGE_GANETI, BUILD_MONITOR_HANDLER,  'update_build_progress'),
167 698d0666 Georgios Gousios
    ]
168 698d0666 Georgios Gousios
169 698d0666 Georgios Gousios
    if settings.DEBUG is True:
170 698d0666 Georgios Gousios
        BINDINGS += [
171 698d0666 Georgios Gousios
            # Queue       # Exchange          # RouteKey  # Handler
172 698d0666 Georgios Gousios
            (QUEUE_DEBUG, settings.EXCHANGE_GANETI, '#',  'dummy_proc'),
173 698d0666 Georgios Gousios
            (QUEUE_DEBUG, settings.EXCHANGE_CRON,   '#',  'dummy_proc'),
174 698d0666 Georgios Gousios
            (QUEUE_DEBUG, settings.EXCHANGE_API,    '#',  'dummy_proc'),
175 698d0666 Georgios Gousios
        ]
176 698d0666 Georgios Gousios
        QUEUES += (QUEUE_DEBUG,)
177 698d0666 Georgios Gousios
178 698d0666 Georgios Gousios
179 78e2d194 Georgios Gousios
def parse_arguments(args):
180 78e2d194 Georgios Gousios
    from optparse import OptionParser
181 78e2d194 Georgios Gousios
182 3f018af1 Christos Stavrakakis
    default_pid_file = \
183 3f018af1 Christos Stavrakakis
        os.path.join(".", "var", "run", "synnefo", "dispatcher.pid")[1:]
184 78e2d194 Georgios Gousios
    parser = OptionParser()
185 c183005e Georgios Gousios
    parser.add_option("-d", "--debug", action="store_true", default=False,
186 2cd99e7a Georgios Gousios
                      dest="debug", help="Enable debug mode")
187 e6209aa2 Georgios Gousios
    parser.add_option("-w", "--workers", default=2, dest="workers",
188 e6209aa2 Georgios Gousios
                      help="Number of workers to spawn", type="int")
189 3d975c75 Kostas Papadimitriou
    parser.add_option("-p", "--pid-file", dest="pid_file",
190 3d975c75 Kostas Papadimitriou
                      default=default_pid_file,
191 3d975c75 Kostas Papadimitriou
                      help="Save PID to file (default: %s)" % default_pid_file)
192 979482ce Georgios Gousios
    parser.add_option("--purge-queues", action="store_true",
193 979482ce Georgios Gousios
                      default=False, dest="purge_queues",
194 57d0082a Georgios Gousios
                      help="Remove all declared queues (DANGEROUS!)")
195 979482ce Georgios Gousios
    parser.add_option("--purge-exchanges", action="store_true",
196 979482ce Georgios Gousios
                      default=False, dest="purge_exchanges",
197 979482ce Georgios Gousios
                      help="Remove all exchanges. Implies deleting all queues \
198 979482ce Georgios Gousios
                           first (DANGEROUS!)")
199 d5470cdd Georgios Gousios
    parser.add_option("--drain-queue", dest="drain_queue",
200 e6209aa2 Georgios Gousios
                      help="Strips a queue from all outstanding messages")
201 de081774 Georgios Gousios
202 78e2d194 Georgios Gousios
    return parser.parse_args(args)
203 78e2d194 Georgios Gousios
204 f30730c0 Georgios Gousios
205 d28244af Vangelis Koukis
def purge_queues():
206 979482ce Georgios Gousios
    """
207 979482ce Georgios Gousios
        Delete declared queues from RabbitMQ. Use with care!
208 979482ce Georgios Gousios
    """
209 698d0666 Georgios Gousios
    global QUEUES, BINDINGS
210 c4e55622 Christos Stavrakakis
    client = AMQPClient()
211 c4e55622 Christos Stavrakakis
    client.connect()
212 f30730c0 Georgios Gousios
213 698d0666 Georgios Gousios
    print "Queues to be deleted: ", QUEUES
214 f30730c0 Georgios Gousios
215 979482ce Georgios Gousios
    if not get_user_confirmation():
216 f30730c0 Georgios Gousios
        return
217 f30730c0 Georgios Gousios
218 698d0666 Georgios Gousios
    for queue in QUEUES:
219 c4e55622 Christos Stavrakakis
        result = client.queue_delete(queue=queue)
220 c4e55622 Christos Stavrakakis
        print "Deleting queue %s. Result: %s" % (queue, result)
221 979482ce Georgios Gousios
222 c4e55622 Christos Stavrakakis
    client.close()
223 979482ce Georgios Gousios
224 979482ce Georgios Gousios
225 979482ce Georgios Gousios
def purge_exchanges():
226 e6f5bb10 Vangelis Koukis
    """Delete declared exchanges from RabbitMQ, after removing all queues"""
227 698d0666 Georgios Gousios
    global QUEUES, BINDINGS
228 979482ce Georgios Gousios
    purge_queues()
229 979482ce Georgios Gousios
230 c4e55622 Christos Stavrakakis
    client = AMQPClient()
231 c4e55622 Christos Stavrakakis
    client.connect()
232 979482ce Georgios Gousios
233 e6f5bb10 Vangelis Koukis
    print "Exchanges to be deleted: ", settings.EXCHANGES
234 979482ce Georgios Gousios
235 979482ce Georgios Gousios
    if not get_user_confirmation():
236 979482ce Georgios Gousios
        return
237 979482ce Georgios Gousios
238 979482ce Georgios Gousios
    for exchange in settings.EXCHANGES:
239 c4e55622 Christos Stavrakakis
        result = client.exchange_delete(exchange=exchange)
240 c4e55622 Christos Stavrakakis
        print "Deleting exchange %s. Result: %s" % (exchange, result)
241 979482ce Georgios Gousios
242 c4e55622 Christos Stavrakakis
    client.close()
243 f30730c0 Georgios Gousios
244 c183005e Georgios Gousios
245 979482ce Georgios Gousios
def drain_queue(queue):
246 e6f5bb10 Vangelis Koukis
    """Strip a (declared) queue from all outstanding messages"""
247 698d0666 Georgios Gousios
    global QUEUES, BINDINGS
248 979482ce Georgios Gousios
    if not queue:
249 979482ce Georgios Gousios
        return
250 979482ce Georgios Gousios
251 698d0666 Georgios Gousios
    if not queue in QUEUES:
252 979482ce Georgios Gousios
        print "Queue %s not configured" % queue
253 979482ce Georgios Gousios
        return
254 979482ce Georgios Gousios
255 979482ce Georgios Gousios
    print "Queue to be drained: %s" % queue
256 979482ce Georgios Gousios
257 979482ce Georgios Gousios
    if not get_user_confirmation():
258 979482ce Georgios Gousios
        return
259 c4e55622 Christos Stavrakakis
260 c4e55622 Christos Stavrakakis
    client = AMQPClient()
261 c4e55622 Christos Stavrakakis
    client.connect()
262 979482ce Georgios Gousios
263 c4e55622 Christos Stavrakakis
    tag = client.basic_consume(queue=queue, callback=callbacks.dummy_proc)
264 e6209aa2 Georgios Gousios
265 e6209aa2 Georgios Gousios
    print "Queue draining about to start, hit Ctrl+c when done"
266 e6209aa2 Georgios Gousios
    time.sleep(2)
267 e6209aa2 Georgios Gousios
    print "Queue draining starting"
268 e6209aa2 Georgios Gousios
269 c626e1d0 Georgios Gousios
    num_processed = 0
270 e6209aa2 Georgios Gousios
    while True:
271 c4e55622 Christos Stavrakakis
        client.basic_wait()
272 c626e1d0 Georgios Gousios
        num_processed += 1
273 c626e1d0 Georgios Gousios
        sys.stderr.write("Ignored %d messages\r" % num_processed)
274 c626e1d0 Georgios Gousios
275 c4e55622 Christos Stavrakakis
    client.basic_cancel(tag)
276 c4e55622 Christos Stavrakakis
    client.close()
277 95aee02c Vangelis Koukis
278 979482ce Georgios Gousios
279 979482ce Georgios Gousios
def get_user_confirmation():
280 979482ce Georgios Gousios
    ans = raw_input("Are you sure (N/y):")
281 979482ce Georgios Gousios
282 979482ce Georgios Gousios
    if not ans:
283 979482ce Georgios Gousios
        return False
284 979482ce Georgios Gousios
    if ans not in ['Y', 'y']:
285 979482ce Georgios Gousios
        return False
286 979482ce Georgios Gousios
    return True
287 979482ce Georgios Gousios
288 979482ce Georgios Gousios
289 57d0082a Georgios Gousios
def debug_mode():
290 2bf8d695 Vangelis Koukis
    disp = Dispatcher(debug=True)
291 838239fa Georgios Gousios
    disp.wait()
292 838239fa Georgios Gousios
293 838239fa Georgios Gousios
294 d28244af Vangelis Koukis
def daemon_mode(opts):
295 3f018af1 Christos Stavrakakis
    disp = Dispatcher(debug=False)
296 3f018af1 Christos Stavrakakis
    disp.wait()
297 c183005e Georgios Gousios
298 d28244af Vangelis Koukis
299 d28244af Vangelis Koukis
def main():
300 cf2a3529 Christos Stavrakakis
    (opts, args) = parse_arguments(sys.argv[1:])
301 cf2a3529 Christos Stavrakakis
302 6c9c95d8 Christos Stavrakakis
    # Rename this process so 'ps' output looks like this is a native
303 6c9c95d8 Christos Stavrakakis
    # executable.  Can not seperate command-line arguments from actual name of
304 6c9c95d8 Christos Stavrakakis
    # the executable by NUL bytes, so only show the name of the executable
305 6c9c95d8 Christos Stavrakakis
    # instead.  setproctitle.setproctitle("\x00".join(sys.argv))
306 6c9c95d8 Christos Stavrakakis
    setproctitle.setproctitle(sys.argv[0])
307 6c9c95d8 Christos Stavrakakis
308 8ec69269 Christos Stavrakakis
    if opts.debug:
309 8ec69269 Christos Stavrakakis
        stream_handler = logging.StreamHandler()
310 8ec69269 Christos Stavrakakis
        formatter = logging.Formatter("%(asctime)s %(module)s %(levelname)s: %(message)s",
311 8ec69269 Christos Stavrakakis
                                      "%Y-%m-%d %H:%M:%S")
312 8ec69269 Christos Stavrakakis
        stream_handler.setFormatter(formatter)
313 8ec69269 Christos Stavrakakis
        log.addHandler(stream_handler)
314 8ec69269 Christos Stavrakakis
315 d28244af Vangelis Koukis
    # Init the global variables containing the queues
316 d28244af Vangelis Koukis
    _init_queues()
317 d28244af Vangelis Koukis
318 d28244af Vangelis Koukis
    # Special case for the clean up queues action
319 d28244af Vangelis Koukis
    if opts.purge_queues:
320 d28244af Vangelis Koukis
        purge_queues()
321 d28244af Vangelis Koukis
        return
322 d28244af Vangelis Koukis
323 d28244af Vangelis Koukis
    # Special case for the clean up exch action
324 d28244af Vangelis Koukis
    if opts.purge_exchanges:
325 d28244af Vangelis Koukis
        purge_exchanges()
326 d28244af Vangelis Koukis
        return
327 d28244af Vangelis Koukis
328 d28244af Vangelis Koukis
    if opts.drain_queue:
329 d28244af Vangelis Koukis
        drain_queue(opts.drain_queue)
330 d28244af Vangelis Koukis
        return
331 d28244af Vangelis Koukis
332 3f018af1 Christos Stavrakakis
    # Debug mode, process messages without daemonizing
333 d28244af Vangelis Koukis
    if opts.debug:
334 d28244af Vangelis Koukis
        debug_mode()
335 d28244af Vangelis Koukis
        return
336 7c62bd54 Kostas Papadimitriou
337 3f018af1 Christos Stavrakakis
    # Create pidfile,
338 3f018af1 Christos Stavrakakis
    pidf = pidlockfile.TimeoutPIDLockFile(opts.pid_file, 10)
339 3f018af1 Christos Stavrakakis
340 3f018af1 Christos Stavrakakis
    if daemon.runner.is_pidfile_stale(pidf):
341 3f018af1 Christos Stavrakakis
        log.warning("Removing stale PID lock file %s", pidf.path)
342 3f018af1 Christos Stavrakakis
        pidf.break_lock()
343 3f018af1 Christos Stavrakakis
344 9e98ba3c Giorgos Verigakis
    files_preserve = []
345 9e98ba3c Giorgos Verigakis
    for handler in log.handlers:
346 9e98ba3c Giorgos Verigakis
        stream = getattr(handler, 'stream')
347 9e98ba3c Giorgos Verigakis
        if stream and hasattr(stream, 'fileno'):
348 9e98ba3c Giorgos Verigakis
            files_preserve.append(handler.stream)
349 7c62bd54 Kostas Papadimitriou
350 3f018af1 Christos Stavrakakis
    stderr_stream = None
351 3f018af1 Christos Stavrakakis
    for handler in log.handlers:
352 3f018af1 Christos Stavrakakis
        stream = getattr(handler, 'stream')
353 3f018af1 Christos Stavrakakis
        if stream and hasattr(handler, 'baseFilename'):
354 3f018af1 Christos Stavrakakis
            stderr_stream = stream
355 3f018af1 Christos Stavrakakis
            break
356 7c62bd54 Kostas Papadimitriou
357 3f018af1 Christos Stavrakakis
    daemon_context = daemon.DaemonContext(
358 3f018af1 Christos Stavrakakis
        pidfile=pidf,
359 3f018af1 Christos Stavrakakis
        umask=0022,
360 3f018af1 Christos Stavrakakis
        stdout=stderr_stream,
361 3f018af1 Christos Stavrakakis
        stderr=stderr_stream,
362 3f018af1 Christos Stavrakakis
        files_preserve=files_preserve)
363 3f018af1 Christos Stavrakakis
364 3f018af1 Christos Stavrakakis
    try:
365 3f018af1 Christos Stavrakakis
        daemon_context.open()
366 3f018af1 Christos Stavrakakis
    except (pidlockfile.AlreadyLocked, LockTimeout):
367 3f018af1 Christos Stavrakakis
        log.critical("Failed to lock pidfile %s, another instance running?",
368 3f018af1 Christos Stavrakakis
                     pidf.path)
369 3f018af1 Christos Stavrakakis
        sys.exit(1)
370 3f018af1 Christos Stavrakakis
371 3f018af1 Christos Stavrakakis
    log.info("Became a daemon")
372 3f018af1 Christos Stavrakakis
373 3f018af1 Christos Stavrakakis
    if 'gevent' in sys.modules:
374 3f018af1 Christos Stavrakakis
        # A fork() has occured while daemonizing. If running in
375 3f018af1 Christos Stavrakakis
        # gevent context we *must* reinit gevent
376 3f018af1 Christos Stavrakakis
        log.debug("gevent imported. Reinitializing gevent")
377 3f018af1 Christos Stavrakakis
        import gevent
378 3f018af1 Christos Stavrakakis
        gevent.reinit()
379 d28244af Vangelis Koukis
380 d28244af Vangelis Koukis
    # Catch every exception, make sure it gets logged properly
381 d28244af Vangelis Koukis
    try:
382 d28244af Vangelis Koukis
        daemon_mode(opts)
383 d28244af Vangelis Koukis
    except Exception:
384 9e98ba3c Giorgos Verigakis
        log.exception("Unknown error")
385 d28244af Vangelis Koukis
        raise
386 d28244af Vangelis Koukis
387 7c62bd54 Kostas Papadimitriou
if __name__ == "__main__":
388 3d975c75 Kostas Papadimitriou
    sys.exit(main())
389 7c62bd54 Kostas Papadimitriou
390 8d8ea051 Georgios Gousios
# vim: set sta sts=4 shiftwidth=4 sw=4 et ai :