Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / dispatcher.py @ d222936b

History | View | Annotate | Download (12.1 kB)

1 d08a5f6f Vangelis Koukis
#!/usr/bin/env python
2 48130e66 Georgios Gousios
# Copyright 2011 GRNET S.A. All rights reserved.
3 87ace70f Vassilios Karakoidas
#
4 48130e66 Georgios Gousios
# Redistribution and use in source and binary forms, with or without
5 48130e66 Georgios Gousios
# modification, are permitted provided that the following conditions
6 48130e66 Georgios Gousios
# are met:
7 7bd50624 Vassilios Karakoidas
#
8 48130e66 Georgios Gousios
#   1. Redistributions of source code must retain the above copyright
9 48130e66 Georgios Gousios
#      notice, this list of conditions and the following disclaimer.
10 48130e66 Georgios Gousios
#
11 48130e66 Georgios Gousios
#  2. Redistributions in binary form must reproduce the above copyright
12 48130e66 Georgios Gousios
#     notice, this list of conditions and the following disclaimer in the
13 48130e66 Georgios Gousios
#     documentation and/or other materials provided with the distribution.
14 48130e66 Georgios Gousios
#
15 48130e66 Georgios Gousios
# THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
16 48130e66 Georgios Gousios
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17 48130e66 Georgios Gousios
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18 48130e66 Georgios Gousios
# ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
19 48130e66 Georgios Gousios
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
20 48130e66 Georgios Gousios
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
21 48130e66 Georgios Gousios
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
22 48130e66 Georgios Gousios
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
23 48130e66 Georgios Gousios
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
24 48130e66 Georgios Gousios
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
25 48130e66 Georgios Gousios
# SUCH DAMAGE.
26 48130e66 Georgios Gousios
#
27 48130e66 Georgios Gousios
# The views and conclusions contained in the software and documentation are
28 48130e66 Georgios Gousios
# those of the authors and should not be interpreted as representing official
29 48130e66 Georgios Gousios
# policies, either expressed or implied, of GRNET S.A.
30 48130e66 Georgios Gousios
31 48130e66 Georgios Gousios
32 e6209aa2 Georgios Gousios
""" Message queue setup, dispatch and admin
33 c183005e Georgios Gousios

34 2cd99e7a Georgios Gousios
This program sets up connections to the queues configured in settings.py
35 2cd99e7a Georgios Gousios
and implements the message wait and dispatch loops. Actual messages are
36 2cd99e7a Georgios Gousios
handled in the dispatched functions.
37 fcbc5bb3 Vassilios Karakoidas

38 d08a5f6f Vangelis Koukis
"""
39 c99fe4c7 Vassilios Karakoidas
40 cf2a3529 Christos Stavrakakis
# Fix path to import synnefo settings
41 d08a5f6f Vangelis Koukis
import sys
42 86221fd5 Panos Louridas
import os
43 86221fd5 Panos Louridas
path = os.path.normpath(os.path.join(os.getcwd(), '..'))
44 86221fd5 Panos Louridas
sys.path.append(path)
45 841c26cc Georgios D. Tsoukalas
46 841c26cc Georgios D. Tsoukalas
os.environ['DJANGO_SETTINGS_MODULE'] = 'synnefo.settings'
47 0c09b1c0 Christos Stavrakakis
from django.conf import settings
48 d08a5f6f Vangelis Koukis
49 5da13d77 Vangelis Koukis
from django.db import close_connection
50 5da13d77 Vangelis Koukis
51 8d8ea051 Georgios Gousios
import time
52 4ed2e471 Georgios Gousios
53 4936f2e2 Christos Stavrakakis
import daemon
54 cf2a3529 Christos Stavrakakis
import daemon.runner
55 cf2a3529 Christos Stavrakakis
from lockfile import LockTimeout
56 4ed2e471 Georgios Gousios
# Take care of differences between python-daemon versions.
57 4ed2e471 Georgios Gousios
try:
58 cf2a3529 Christos Stavrakakis
    from daemon import pidfile as pidlockfile
59 4ed2e471 Georgios Gousios
except:
60 4ed2e471 Georgios Gousios
    from daemon import pidlockfile
61 1dc821c9 Christos Stavrakakis
import setproctitle
62 8d8ea051 Georgios Gousios
63 cf2a3529 Christos Stavrakakis
from synnefo.lib.amqp import AMQPClient
64 9cb903f9 Vangelis Koukis
from synnefo.logic import callbacks
65 659de616 Christos Stavrakakis
from synnefo.logic import queues
66 9e98ba3c Giorgos Verigakis
67 d01cd522 Christos Stavrakakis
import logging
68 d222936b Christos Stavrakakis
import select
69 d222936b Christos Stavrakakis
import errno
70 fdfd8c6d Christos Stavrakakis
71 fdfd8c6d Christos Stavrakakis
log = logging.getLogger("dispatcher")
72 fdfd8c6d Christos Stavrakakis
log_amqp = logging.getLogger("amqp")
73 fdfd8c6d Christos Stavrakakis
log_logic = logging.getLogger("synnefo.logic")
74 fdfd8c6d Christos Stavrakakis
75 fdfd8c6d Christos Stavrakakis
LOGGERS = [log, log_amqp, log_logic]
76 9e98ba3c Giorgos Verigakis
77 2bf8d695 Vangelis Koukis
78 78e2d194 Georgios Gousios
class Dispatcher:
79 5d081749 Georgios Gousios
    debug = False
80 78e2d194 Georgios Gousios
81 2bf8d695 Vangelis Koukis
    def __init__(self, debug=False):
82 5d081749 Georgios Gousios
        self.debug = debug
83 5d081749 Georgios Gousios
        self._init()
84 da102335 Georgios Gousios
85 78e2d194 Georgios Gousios
    def wait(self):
86 c4e55622 Christos Stavrakakis
        log.info("Waiting for messages..")
87 b1bb9251 Christos Stavrakakis
        timeout = 600
88 78e2d194 Georgios Gousios
        while True:
89 78e2d194 Georgios Gousios
            try:
90 5da13d77 Vangelis Koukis
                # Close the Django DB connection before processing
91 5da13d77 Vangelis Koukis
                # every incoming message. This plays nicely with
92 5da13d77 Vangelis Koukis
                # DB connection pooling, if enabled and allows
93 5da13d77 Vangelis Koukis
                # the dispatcher to recover from broken connections
94 5da13d77 Vangelis Koukis
                # gracefully.
95 5da13d77 Vangelis Koukis
                close_connection()
96 b1bb9251 Christos Stavrakakis
                msg = self.client.basic_wait(timeout=timeout)
97 b1bb9251 Christos Stavrakakis
                if not msg:
98 b1bb9251 Christos Stavrakakis
                    log.warning("Idle connection for %d seconds. Will connect"
99 b1bb9251 Christos Stavrakakis
                                " to a different host. Verify that"
100 b1bb9251 Christos Stavrakakis
                                " snf-ganeti-eventd is running!!", timeout)
101 b1bb9251 Christos Stavrakakis
                    self.client.reconnect()
102 d222936b Christos Stavrakakis
            except select.error as e:
103 d222936b Christos Stavrakakis
                if e[0] != errno.EINTR:
104 d222936b Christos Stavrakakis
                    log.exception("Caught unexpected exception: %s", e)
105 d222936b Christos Stavrakakis
                else:
106 d222936b Christos Stavrakakis
                    break
107 d222936b Christos Stavrakakis
            except (SystemExit, KeyboardInterrupt):
108 ff77b897 Christos Stavrakakis
                break
109 c4e55622 Christos Stavrakakis
            except Exception as e:
110 c4e55622 Christos Stavrakakis
                log.exception("Caught unexpected exception: %s", e)
111 c4e55622 Christos Stavrakakis
112 c4e55622 Christos Stavrakakis
        self.client.basic_cancel()
113 c4e55622 Christos Stavrakakis
        self.client.close()
114 78e2d194 Georgios Gousios
115 5d081749 Georgios Gousios
    def _init(self):
116 9e98ba3c Giorgos Verigakis
        log.info("Initializing")
117 226f086a Georgios Gousios
118 a8858945 Christos Stavrakakis
        self.client = AMQPClient(logger=log_amqp)
119 c4e55622 Christos Stavrakakis
        # Connect to AMQP host
120 c4e55622 Christos Stavrakakis
        self.client.connect()
121 78e2d194 Georgios Gousios
122 c183005e Georgios Gousios
        # Declare queues and exchanges
123 659de616 Christos Stavrakakis
        exchange = settings.EXCHANGE_GANETI
124 659de616 Christos Stavrakakis
        exchange_dl = queues.convert_exchange_to_dead(exchange)
125 659de616 Christos Stavrakakis
        self.client.exchange_declare(exchange=exchange,
126 659de616 Christos Stavrakakis
                                     type="topic")
127 659de616 Christos Stavrakakis
        self.client.exchange_declare(exchange=exchange_dl,
128 659de616 Christos Stavrakakis
                                     type="topic")
129 659de616 Christos Stavrakakis
130 659de616 Christos Stavrakakis
        for queue in queues.QUEUES:
131 c4e55622 Christos Stavrakakis
            # Queues are mirrored to all RabbitMQ brokers
132 659de616 Christos Stavrakakis
            self.client.queue_declare(queue=queue, mirrored=True,
133 659de616 Christos Stavrakakis
                                      dead_letter_exchange=exchange_dl)
134 659de616 Christos Stavrakakis
            # Declare the corresponding dead-letter queue
135 659de616 Christos Stavrakakis
            queue_dl = queues.convert_queue_to_dead(queue)
136 659de616 Christos Stavrakakis
            self.client.queue_declare(queue=queue_dl, mirrored=True)
137 78e2d194 Georgios Gousios
138 c183005e Georgios Gousios
        # Bind queues to handler methods
139 659de616 Christos Stavrakakis
        for binding in queues.BINDINGS:
140 78e2d194 Georgios Gousios
            try:
141 9cb903f9 Vangelis Koukis
                callback = getattr(callbacks, binding[3])
142 23c84263 Georgios Gousios
            except AttributeError:
143 9e98ba3c Giorgos Verigakis
                log.error("Cannot find callback %s", binding[3])
144 e6f5bb10 Vangelis Koukis
                raise SystemExit(1)
145 659de616 Christos Stavrakakis
            queue = binding[0]
146 659de616 Christos Stavrakakis
            exchange = binding[1]
147 659de616 Christos Stavrakakis
            routing_key = binding[2]
148 8d8ea051 Georgios Gousios
149 659de616 Christos Stavrakakis
            self.client.queue_bind(queue=queue, exchange=exchange,
150 659de616 Christos Stavrakakis
                                   routing_key=routing_key)
151 c4e55622 Christos Stavrakakis
152 370f69ec Christos Stavrakakis
            self.client.basic_consume(queue=binding[0],
153 6d27eadd Christos Stavrakakis
                                      callback=callback,
154 6d27eadd Christos Stavrakakis
                                      prefetch_count=5)
155 c4e55622 Christos Stavrakakis
156 659de616 Christos Stavrakakis
            queue_dl = queues.convert_queue_to_dead(queue)
157 659de616 Christos Stavrakakis
            exchange_dl = queues.convert_exchange_to_dead(exchange)
158 659de616 Christos Stavrakakis
            # Bind the corresponding dead-letter queue
159 659de616 Christos Stavrakakis
            self.client.queue_bind(queue=queue_dl,
160 659de616 Christos Stavrakakis
                                   exchange=exchange_dl,
161 659de616 Christos Stavrakakis
                                   routing_key=routing_key)
162 659de616 Christos Stavrakakis
163 9e98ba3c Giorgos Verigakis
            log.debug("Binding %s(%s) to queue %s with handler %s",
164 659de616 Christos Stavrakakis
                      exchange, routing_key, queue, binding[3])
165 698d0666 Georgios Gousios
166 698d0666 Georgios Gousios
167 78e2d194 Georgios Gousios
def parse_arguments(args):
168 78e2d194 Georgios Gousios
    from optparse import OptionParser
169 78e2d194 Georgios Gousios
170 4936f2e2 Christos Stavrakakis
    default_pid_file = \
171 4936f2e2 Christos Stavrakakis
        os.path.join(".", "var", "run", "synnefo", "dispatcher.pid")[1:]
172 78e2d194 Georgios Gousios
    parser = OptionParser()
173 c183005e Georgios Gousios
    parser.add_option("-d", "--debug", action="store_true", default=False,
174 2cd99e7a Georgios Gousios
                      dest="debug", help="Enable debug mode")
175 e6209aa2 Georgios Gousios
    parser.add_option("-w", "--workers", default=2, dest="workers",
176 e6209aa2 Georgios Gousios
                      help="Number of workers to spawn", type="int")
177 3d975c75 Kostas Papadimitriou
    parser.add_option("-p", "--pid-file", dest="pid_file",
178 3d975c75 Kostas Papadimitriou
                      default=default_pid_file,
179 3d975c75 Kostas Papadimitriou
                      help="Save PID to file (default: %s)" % default_pid_file)
180 979482ce Georgios Gousios
    parser.add_option("--purge-queues", action="store_true",
181 979482ce Georgios Gousios
                      default=False, dest="purge_queues",
182 57d0082a Georgios Gousios
                      help="Remove all declared queues (DANGEROUS!)")
183 979482ce Georgios Gousios
    parser.add_option("--purge-exchanges", action="store_true",
184 979482ce Georgios Gousios
                      default=False, dest="purge_exchanges",
185 979482ce Georgios Gousios
                      help="Remove all exchanges. Implies deleting all queues \
186 979482ce Georgios Gousios
                           first (DANGEROUS!)")
187 d5470cdd Georgios Gousios
    parser.add_option("--drain-queue", dest="drain_queue",
188 e6209aa2 Georgios Gousios
                      help="Strips a queue from all outstanding messages")
189 de081774 Georgios Gousios
190 78e2d194 Georgios Gousios
    return parser.parse_args(args)
191 78e2d194 Georgios Gousios
192 f30730c0 Georgios Gousios
193 d28244af Vangelis Koukis
def purge_queues():
194 979482ce Georgios Gousios
    """
195 979482ce Georgios Gousios
        Delete declared queues from RabbitMQ. Use with care!
196 979482ce Georgios Gousios
    """
197 2ef10562 Christos Stavrakakis
    client = AMQPClient(max_retries=120)
198 c4e55622 Christos Stavrakakis
    client.connect()
199 f30730c0 Georgios Gousios
200 659de616 Christos Stavrakakis
    print "Queues to be deleted: ", queues.QUEUES
201 f30730c0 Georgios Gousios
202 979482ce Georgios Gousios
    if not get_user_confirmation():
203 f30730c0 Georgios Gousios
        return
204 f30730c0 Georgios Gousios
205 659de616 Christos Stavrakakis
    for queue in queues.QUEUES:
206 c4e55622 Christos Stavrakakis
        result = client.queue_delete(queue=queue)
207 c4e55622 Christos Stavrakakis
        print "Deleting queue %s. Result: %s" % (queue, result)
208 979482ce Georgios Gousios
209 c4e55622 Christos Stavrakakis
    client.close()
210 979482ce Georgios Gousios
211 979482ce Georgios Gousios
212 979482ce Georgios Gousios
def purge_exchanges():
213 e6f5bb10 Vangelis Koukis
    """Delete declared exchanges from RabbitMQ, after removing all queues"""
214 979482ce Georgios Gousios
    purge_queues()
215 979482ce Georgios Gousios
216 c4e55622 Christos Stavrakakis
    client = AMQPClient()
217 c4e55622 Christos Stavrakakis
    client.connect()
218 979482ce Georgios Gousios
219 659de616 Christos Stavrakakis
    exchanges = queues.EXCHANGES
220 659de616 Christos Stavrakakis
    print "Exchanges to be deleted: ", exchanges
221 979482ce Georgios Gousios
222 979482ce Georgios Gousios
    if not get_user_confirmation():
223 979482ce Georgios Gousios
        return
224 979482ce Georgios Gousios
225 659de616 Christos Stavrakakis
    for exch in exchanges:
226 659de616 Christos Stavrakakis
        result = client.exchange_delete(exchange=exch)
227 659de616 Christos Stavrakakis
        print "Deleting exchange %s. Result: %s" % (exch, result)
228 c4e55622 Christos Stavrakakis
    client.close()
229 f30730c0 Georgios Gousios
230 c183005e Georgios Gousios
231 979482ce Georgios Gousios
def drain_queue(queue):
232 e6f5bb10 Vangelis Koukis
    """Strip a (declared) queue from all outstanding messages"""
233 979482ce Georgios Gousios
    if not queue:
234 979482ce Georgios Gousios
        return
235 979482ce Georgios Gousios
236 659de616 Christos Stavrakakis
    if not queue in queues.QUEUES:
237 979482ce Georgios Gousios
        print "Queue %s not configured" % queue
238 979482ce Georgios Gousios
        return
239 979482ce Georgios Gousios
240 979482ce Georgios Gousios
    print "Queue to be drained: %s" % queue
241 979482ce Georgios Gousios
242 979482ce Georgios Gousios
    if not get_user_confirmation():
243 979482ce Georgios Gousios
        return
244 c4e55622 Christos Stavrakakis
245 c4e55622 Christos Stavrakakis
    client = AMQPClient()
246 c4e55622 Christos Stavrakakis
    client.connect()
247 979482ce Georgios Gousios
248 c4e55622 Christos Stavrakakis
    tag = client.basic_consume(queue=queue, callback=callbacks.dummy_proc)
249 e6209aa2 Georgios Gousios
250 e6209aa2 Georgios Gousios
    print "Queue draining about to start, hit Ctrl+c when done"
251 e6209aa2 Georgios Gousios
    time.sleep(2)
252 e6209aa2 Georgios Gousios
    print "Queue draining starting"
253 e6209aa2 Georgios Gousios
254 c626e1d0 Georgios Gousios
    num_processed = 0
255 e6209aa2 Georgios Gousios
    while True:
256 c4e55622 Christos Stavrakakis
        client.basic_wait()
257 c626e1d0 Georgios Gousios
        num_processed += 1
258 c626e1d0 Georgios Gousios
        sys.stderr.write("Ignored %d messages\r" % num_processed)
259 c626e1d0 Georgios Gousios
260 c4e55622 Christos Stavrakakis
    client.basic_cancel(tag)
261 c4e55622 Christos Stavrakakis
    client.close()
262 95aee02c Vangelis Koukis
263 979482ce Georgios Gousios
264 979482ce Georgios Gousios
def get_user_confirmation():
265 979482ce Georgios Gousios
    ans = raw_input("Are you sure (N/y):")
266 979482ce Georgios Gousios
267 979482ce Georgios Gousios
    if not ans:
268 979482ce Georgios Gousios
        return False
269 979482ce Georgios Gousios
    if ans not in ['Y', 'y']:
270 979482ce Georgios Gousios
        return False
271 979482ce Georgios Gousios
    return True
272 979482ce Georgios Gousios
273 979482ce Georgios Gousios
274 57d0082a Georgios Gousios
def debug_mode():
275 2bf8d695 Vangelis Koukis
    disp = Dispatcher(debug=True)
276 838239fa Georgios Gousios
    disp.wait()
277 838239fa Georgios Gousios
278 838239fa Georgios Gousios
279 d28244af Vangelis Koukis
def daemon_mode(opts):
280 4936f2e2 Christos Stavrakakis
    disp = Dispatcher(debug=False)
281 4936f2e2 Christos Stavrakakis
    disp.wait()
282 c183005e Georgios Gousios
283 d28244af Vangelis Koukis
284 d01cd522 Christos Stavrakakis
def setup_logging(opts):
285 d01cd522 Christos Stavrakakis
    import logging
286 cc92b70f Christos Stavrakakis
    formatter = logging.Formatter("%(asctime)s %(name)s %(module)s"
287 cc92b70f Christos Stavrakakis
                                  " [%(levelname)s] %(message)s")
288 d01cd522 Christos Stavrakakis
    if opts.debug:
289 fdfd8c6d Christos Stavrakakis
        log_handler = logging.StreamHandler()
290 fdfd8c6d Christos Stavrakakis
        log_handler.setFormatter(formatter)
291 d01cd522 Christos Stavrakakis
    else:
292 d01cd522 Christos Stavrakakis
        import logging.handlers
293 d01cd522 Christos Stavrakakis
        log_file = "/var/log/synnefo/dispatcher.log"
294 fdfd8c6d Christos Stavrakakis
        log_handler = logging.handlers.WatchedFileHandler(log_file)
295 fdfd8c6d Christos Stavrakakis
        log_handler.setFormatter(formatter)
296 d01cd522 Christos Stavrakakis
297 fdfd8c6d Christos Stavrakakis
    for l in LOGGERS:
298 fdfd8c6d Christos Stavrakakis
        l.addHandler(log_handler)
299 fdfd8c6d Christos Stavrakakis
        l.setLevel(logging.DEBUG)
300 d01cd522 Christos Stavrakakis
301 d01cd522 Christos Stavrakakis
302 d28244af Vangelis Koukis
def main():
303 cf2a3529 Christos Stavrakakis
    (opts, args) = parse_arguments(sys.argv[1:])
304 cf2a3529 Christos Stavrakakis
305 1dc821c9 Christos Stavrakakis
    # Rename this process so 'ps' output looks like this is a native
306 8d5795b4 Christos Stavrakakis
    # executable.  Cannot seperate command-line arguments from actual name of
307 1dc821c9 Christos Stavrakakis
    # the executable by NUL bytes, so only show the name of the executable
308 1dc821c9 Christos Stavrakakis
    # instead.  setproctitle.setproctitle("\x00".join(sys.argv))
309 1dc821c9 Christos Stavrakakis
    setproctitle.setproctitle(sys.argv[0])
310 d01cd522 Christos Stavrakakis
    setup_logging(opts)
311 8ec69269 Christos Stavrakakis
312 d28244af Vangelis Koukis
    # Special case for the clean up queues action
313 d28244af Vangelis Koukis
    if opts.purge_queues:
314 d28244af Vangelis Koukis
        purge_queues()
315 d28244af Vangelis Koukis
        return
316 d28244af Vangelis Koukis
317 d28244af Vangelis Koukis
    # Special case for the clean up exch action
318 d28244af Vangelis Koukis
    if opts.purge_exchanges:
319 d28244af Vangelis Koukis
        purge_exchanges()
320 d28244af Vangelis Koukis
        return
321 d28244af Vangelis Koukis
322 d28244af Vangelis Koukis
    if opts.drain_queue:
323 d28244af Vangelis Koukis
        drain_queue(opts.drain_queue)
324 d28244af Vangelis Koukis
        return
325 d28244af Vangelis Koukis
326 4936f2e2 Christos Stavrakakis
    # Debug mode, process messages without daemonizing
327 d28244af Vangelis Koukis
    if opts.debug:
328 d28244af Vangelis Koukis
        debug_mode()
329 d28244af Vangelis Koukis
        return
330 7c62bd54 Kostas Papadimitriou
331 4936f2e2 Christos Stavrakakis
    # Create pidfile,
332 4936f2e2 Christos Stavrakakis
    pidf = pidlockfile.TimeoutPIDLockFile(opts.pid_file, 10)
333 4936f2e2 Christos Stavrakakis
334 4936f2e2 Christos Stavrakakis
    if daemon.runner.is_pidfile_stale(pidf):
335 4936f2e2 Christos Stavrakakis
        log.warning("Removing stale PID lock file %s", pidf.path)
336 4936f2e2 Christos Stavrakakis
        pidf.break_lock()
337 4936f2e2 Christos Stavrakakis
338 9e98ba3c Giorgos Verigakis
    files_preserve = []
339 9e98ba3c Giorgos Verigakis
    for handler in log.handlers:
340 9e98ba3c Giorgos Verigakis
        stream = getattr(handler, 'stream')
341 9e98ba3c Giorgos Verigakis
        if stream and hasattr(stream, 'fileno'):
342 9e98ba3c Giorgos Verigakis
            files_preserve.append(handler.stream)
343 7c62bd54 Kostas Papadimitriou
344 4936f2e2 Christos Stavrakakis
    stderr_stream = None
345 4936f2e2 Christos Stavrakakis
    for handler in log.handlers:
346 4936f2e2 Christos Stavrakakis
        stream = getattr(handler, 'stream')
347 4936f2e2 Christos Stavrakakis
        if stream and hasattr(handler, 'baseFilename'):
348 4936f2e2 Christos Stavrakakis
            stderr_stream = stream
349 4936f2e2 Christos Stavrakakis
            break
350 7c62bd54 Kostas Papadimitriou
351 4936f2e2 Christos Stavrakakis
    daemon_context = daemon.DaemonContext(
352 4936f2e2 Christos Stavrakakis
        pidfile=pidf,
353 4936f2e2 Christos Stavrakakis
        umask=0022,
354 4936f2e2 Christos Stavrakakis
        stdout=stderr_stream,
355 4936f2e2 Christos Stavrakakis
        stderr=stderr_stream,
356 4936f2e2 Christos Stavrakakis
        files_preserve=files_preserve)
357 4936f2e2 Christos Stavrakakis
358 4936f2e2 Christos Stavrakakis
    try:
359 4936f2e2 Christos Stavrakakis
        daemon_context.open()
360 4936f2e2 Christos Stavrakakis
    except (pidlockfile.AlreadyLocked, LockTimeout):
361 4936f2e2 Christos Stavrakakis
        log.critical("Failed to lock pidfile %s, another instance running?",
362 4936f2e2 Christos Stavrakakis
                     pidf.path)
363 4936f2e2 Christos Stavrakakis
        sys.exit(1)
364 4936f2e2 Christos Stavrakakis
365 4936f2e2 Christos Stavrakakis
    log.info("Became a daemon")
366 4936f2e2 Christos Stavrakakis
367 4936f2e2 Christos Stavrakakis
    if 'gevent' in sys.modules:
368 4936f2e2 Christos Stavrakakis
        # A fork() has occured while daemonizing. If running in
369 4936f2e2 Christos Stavrakakis
        # gevent context we *must* reinit gevent
370 4936f2e2 Christos Stavrakakis
        log.debug("gevent imported. Reinitializing gevent")
371 4936f2e2 Christos Stavrakakis
        import gevent
372 4936f2e2 Christos Stavrakakis
        gevent.reinit()
373 d28244af Vangelis Koukis
374 d28244af Vangelis Koukis
    # Catch every exception, make sure it gets logged properly
375 d28244af Vangelis Koukis
    try:
376 d28244af Vangelis Koukis
        daemon_mode(opts)
377 d28244af Vangelis Koukis
    except Exception:
378 9e98ba3c Giorgos Verigakis
        log.exception("Unknown error")
379 d28244af Vangelis Koukis
        raise
380 d28244af Vangelis Koukis
381 7c62bd54 Kostas Papadimitriou
if __name__ == "__main__":
382 3d975c75 Kostas Papadimitriou
    sys.exit(main())
383 7c62bd54 Kostas Papadimitriou
384 8d8ea051 Georgios Gousios
# vim: set sta sts=4 shiftwidth=4 sw=4 et ai :