Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / dispatcher.py @ 3f018af1

History | View | Annotate | Download (11.6 kB)

1 d08a5f6f Vangelis Koukis
#!/usr/bin/env python
2 48130e66 Georgios Gousios
# Copyright 2011 GRNET S.A. All rights reserved.
3 87ace70f Vassilios Karakoidas
#
4 48130e66 Georgios Gousios
# Redistribution and use in source and binary forms, with or without
5 48130e66 Georgios Gousios
# modification, are permitted provided that the following conditions
6 48130e66 Georgios Gousios
# are met:
7 7bd50624 Vassilios Karakoidas
#
8 48130e66 Georgios Gousios
#   1. Redistributions of source code must retain the above copyright
9 48130e66 Georgios Gousios
#      notice, this list of conditions and the following disclaimer.
10 48130e66 Georgios Gousios
#
11 48130e66 Georgios Gousios
#  2. Redistributions in binary form must reproduce the above copyright
12 48130e66 Georgios Gousios
#     notice, this list of conditions and the following disclaimer in the
13 48130e66 Georgios Gousios
#     documentation and/or other materials provided with the distribution.
14 48130e66 Georgios Gousios
#
15 48130e66 Georgios Gousios
# THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
16 48130e66 Georgios Gousios
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17 48130e66 Georgios Gousios
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18 48130e66 Georgios Gousios
# ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
19 48130e66 Georgios Gousios
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
20 48130e66 Georgios Gousios
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
21 48130e66 Georgios Gousios
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
22 48130e66 Georgios Gousios
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
23 48130e66 Georgios Gousios
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
24 48130e66 Georgios Gousios
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
25 48130e66 Georgios Gousios
# SUCH DAMAGE.
26 48130e66 Georgios Gousios
#
27 48130e66 Georgios Gousios
# The views and conclusions contained in the software and documentation are
28 48130e66 Georgios Gousios
# those of the authors and should not be interpreted as representing official
29 48130e66 Georgios Gousios
# policies, either expressed or implied, of GRNET S.A.
30 48130e66 Georgios Gousios
31 48130e66 Georgios Gousios
32 e6209aa2 Georgios Gousios
""" Message queue setup, dispatch and admin
33 c183005e Georgios Gousios

34 2cd99e7a Georgios Gousios
This program sets up connections to the queues configured in settings.py
35 2cd99e7a Georgios Gousios
and implements the message wait and dispatch loops. Actual messages are
36 2cd99e7a Georgios Gousios
handled in the dispatched functions.
37 fcbc5bb3 Vassilios Karakoidas

38 d08a5f6f Vangelis Koukis
"""
39 d08a5f6f Vangelis Koukis
from django.core.management import setup_environ
40 c99fe4c7 Vassilios Karakoidas
41 cf2a3529 Christos Stavrakakis
# Fix path to import synnefo settings
42 d08a5f6f Vangelis Koukis
import sys
43 86221fd5 Panos Louridas
import os
44 86221fd5 Panos Louridas
path = os.path.normpath(os.path.join(os.getcwd(), '..'))
45 86221fd5 Panos Louridas
sys.path.append(path)
46 de470b1e Kostas Papadimitriou
from synnefo import settings
47 d08a5f6f Vangelis Koukis
setup_environ(settings)
48 d08a5f6f Vangelis Koukis
49 9e98ba3c Giorgos Verigakis
import logging
50 8d8ea051 Georgios Gousios
import time
51 4ed2e471 Georgios Gousios
52 3f018af1 Christos Stavrakakis
import daemon
53 cf2a3529 Christos Stavrakakis
import daemon.runner
54 cf2a3529 Christos Stavrakakis
from lockfile import LockTimeout
55 4ed2e471 Georgios Gousios
# Take care of differences between python-daemon versions.
56 4ed2e471 Georgios Gousios
try:
57 cf2a3529 Christos Stavrakakis
    from daemon import pidfile as pidlockfile
58 4ed2e471 Georgios Gousios
except:
59 4ed2e471 Georgios Gousios
    from daemon import pidlockfile
60 8d8ea051 Georgios Gousios
61 cf2a3529 Christos Stavrakakis
from synnefo.lib.amqp import AMQPClient
62 9cb903f9 Vangelis Koukis
from synnefo.logic import callbacks
63 9e98ba3c Giorgos Verigakis
64 3f018af1 Christos Stavrakakis
from synnefo.util.dictconfig import dictConfig
65 3f018af1 Christos Stavrakakis
dictConfig(settings.DISPATCHER_LOGGING)
66 9e98ba3c Giorgos Verigakis
log = logging.getLogger()
67 9e98ba3c Giorgos Verigakis
68 698d0666 Georgios Gousios
# Queue names
69 698d0666 Georgios Gousios
QUEUES = []
70 698d0666 Georgios Gousios
71 698d0666 Georgios Gousios
# Queue bindings to exchanges
72 698d0666 Georgios Gousios
BINDINGS = []
73 698d0666 Georgios Gousios
74 2bf8d695 Vangelis Koukis
75 78e2d194 Georgios Gousios
class Dispatcher:
76 5d081749 Georgios Gousios
    debug = False
77 78e2d194 Georgios Gousios
78 2bf8d695 Vangelis Koukis
    def __init__(self, debug=False):
79 5d081749 Georgios Gousios
        self.debug = debug
80 5d081749 Georgios Gousios
        self._init()
81 da102335 Georgios Gousios
82 78e2d194 Georgios Gousios
    def wait(self):
83 c4e55622 Christos Stavrakakis
        log.info("Waiting for messages..")
84 78e2d194 Georgios Gousios
        while True:
85 78e2d194 Georgios Gousios
            try:
86 c4e55622 Christos Stavrakakis
                self.client.basic_wait()
87 78e2d194 Georgios Gousios
            except SystemExit:
88 78e2d194 Georgios Gousios
                break
89 c4e55622 Christos Stavrakakis
            except Exception as e:
90 c4e55622 Christos Stavrakakis
                log.exception("Caught unexpected exception: %s", e)
91 c4e55622 Christos Stavrakakis
92 c4e55622 Christos Stavrakakis
        self.client.basic_cancel()
93 c4e55622 Christos Stavrakakis
        self.client.close()
94 78e2d194 Georgios Gousios
95 5d081749 Georgios Gousios
    def _init(self):
96 698d0666 Georgios Gousios
        global QUEUES, BINDINGS
97 9e98ba3c Giorgos Verigakis
        log.info("Initializing")
98 226f086a Georgios Gousios
99 c4e55622 Christos Stavrakakis
        self.client = AMQPClient()
100 c4e55622 Christos Stavrakakis
        # Connect to AMQP host
101 c4e55622 Christos Stavrakakis
        self.client.connect()
102 78e2d194 Georgios Gousios
103 c183005e Georgios Gousios
        # Declare queues and exchanges
104 f30730c0 Georgios Gousios
        for exchange in settings.EXCHANGES:
105 db400d82 Christos Stavrakakis
            self.client.exchange_declare(exchange=exchange,
106 db400d82 Christos Stavrakakis
                                         type="topic")
107 f30730c0 Georgios Gousios
108 226f086a Georgios Gousios
        for queue in QUEUES:
109 c4e55622 Christos Stavrakakis
            # Queues are mirrored to all RabbitMQ brokers
110 db400d82 Christos Stavrakakis
            self.client.queue_declare(queue=queue, mirrored=True)
111 78e2d194 Georgios Gousios
112 226f086a Georgios Gousios
        bindings = BINDINGS
113 78e2d194 Georgios Gousios
114 c183005e Georgios Gousios
        # Bind queues to handler methods
115 5d081749 Georgios Gousios
        for binding in bindings:
116 78e2d194 Georgios Gousios
            try:
117 9cb903f9 Vangelis Koukis
                callback = getattr(callbacks, binding[3])
118 23c84263 Georgios Gousios
            except AttributeError:
119 9e98ba3c Giorgos Verigakis
                log.error("Cannot find callback %s", binding[3])
120 e6f5bb10 Vangelis Koukis
                raise SystemExit(1)
121 8d8ea051 Georgios Gousios
122 c4e55622 Christos Stavrakakis
            self.client.queue_bind(queue=binding[0], exchange=binding[1],
123 c4e55622 Christos Stavrakakis
                                   routing_key=binding[2])
124 c4e55622 Christos Stavrakakis
125 370f69ec Christos Stavrakakis
            self.client.basic_consume(queue=binding[0],
126 c4e55622 Christos Stavrakakis
                                                        callback=callback)
127 c4e55622 Christos Stavrakakis
128 9e98ba3c Giorgos Verigakis
            log.debug("Binding %s(%s) to queue %s with handler %s",
129 c4e55622 Christos Stavrakakis
                      binding[1], binding[2], binding[0], binding[3])
130 8d8ea051 Georgios Gousios
131 c183005e Georgios Gousios
132 698d0666 Georgios Gousios
def _init_queues():
133 698d0666 Georgios Gousios
    global QUEUES, BINDINGS
134 698d0666 Georgios Gousios
135 698d0666 Georgios Gousios
    # Queue declarations
136 698d0666 Georgios Gousios
    prefix = settings.BACKEND_PREFIX_ID.split('-')[0]
137 698d0666 Georgios Gousios
138 698d0666 Georgios Gousios
    QUEUE_GANETI_EVENTS_OP = "%s-events-op" % prefix
139 a17a8e98 Christos Stavrakakis
    QUEUE_GANETI_EVENTS_NETWORK = "%s-events-network" % prefix
140 698d0666 Georgios Gousios
    QUEUE_GANETI_EVENTS_NET = "%s-events-net" % prefix
141 9068cd85 Georgios Gousios
    QUEUE_GANETI_BUILD_PROGR = "%s-events-progress" % prefix
142 698d0666 Georgios Gousios
    QUEUE_RECONC = "%s-reconciliation" % prefix
143 698d0666 Georgios Gousios
    if settings.DEBUG is True:
144 b47b895d Christos Stavrakakis
        QUEUE_DEBUG = "%s-debug" % prefix  # Debug queue, retrieves all messages
145 698d0666 Georgios Gousios
146 a17a8e98 Christos Stavrakakis
    QUEUES = (QUEUE_GANETI_EVENTS_OP, QUEUE_GANETI_EVENTS_NETWORK, QUEUE_GANETI_EVENTS_NET, QUEUE_RECONC,
147 fd56d250 Giorgos Verigakis
              QUEUE_GANETI_BUILD_PROGR)
148 698d0666 Georgios Gousios
149 698d0666 Georgios Gousios
    # notifications of type "ganeti-op-status"
150 2bf8d695 Vangelis Koukis
    DB_HANDLER_KEY_OP = 'ganeti.%s.event.op' % prefix
151 a17a8e98 Christos Stavrakakis
    # notifications of type "ganeti-network-status"
152 a17a8e98 Christos Stavrakakis
    DB_HANDLER_KEY_NETWORK = 'ganeti.%s.event.network' % prefix
153 698d0666 Georgios Gousios
    # notifications of type "ganeti-net-status"
154 2bf8d695 Vangelis Koukis
    DB_HANDLER_KEY_NET = 'ganeti.%s.event.net' % prefix
155 e6f5bb10 Vangelis Koukis
    # notifications of type "ganeti-create-progress"
156 e6f5bb10 Vangelis Koukis
    BUILD_MONITOR_HANDLER = 'ganeti.%s.event.progress' % prefix
157 0230cd3a Vangelis Koukis
    # reconciliation
158 633d9cfa Georgios Gousios
    RECONC_HANDLER = 'reconciliation.%s.*' % prefix
159 698d0666 Georgios Gousios
160 698d0666 Georgios Gousios
    BINDINGS = [
161 9068cd85 Georgios Gousios
    # Queue                   # Exchange                # RouteKey              # Handler
162 9068cd85 Georgios Gousios
    (QUEUE_GANETI_EVENTS_OP,  settings.EXCHANGE_GANETI, DB_HANDLER_KEY_OP,      'update_db'),
163 a17a8e98 Christos Stavrakakis
    (QUEUE_GANETI_EVENTS_NETWORK, settings.EXCHANGE_GANETI, DB_HANDLER_KEY_NETWORK, 'update_network'),
164 9068cd85 Georgios Gousios
    (QUEUE_GANETI_EVENTS_NET, settings.EXCHANGE_GANETI, DB_HANDLER_KEY_NET,     'update_net'),
165 cf2a3529 Christos Stavrakakis
    (QUEUE_GANETI_BUILD_PROGR, settings.EXCHANGE_GANETI, BUILD_MONITOR_HANDLER,  'update_build_progress'),
166 698d0666 Georgios Gousios
    ]
167 698d0666 Georgios Gousios
168 698d0666 Georgios Gousios
    if settings.DEBUG is True:
169 698d0666 Georgios Gousios
        BINDINGS += [
170 698d0666 Georgios Gousios
            # Queue       # Exchange          # RouteKey  # Handler
171 698d0666 Georgios Gousios
            (QUEUE_DEBUG, settings.EXCHANGE_GANETI, '#',  'dummy_proc'),
172 698d0666 Georgios Gousios
            (QUEUE_DEBUG, settings.EXCHANGE_CRON,   '#',  'dummy_proc'),
173 698d0666 Georgios Gousios
            (QUEUE_DEBUG, settings.EXCHANGE_API,    '#',  'dummy_proc'),
174 698d0666 Georgios Gousios
        ]
175 698d0666 Georgios Gousios
        QUEUES += (QUEUE_DEBUG,)
176 698d0666 Georgios Gousios
177 698d0666 Georgios Gousios
178 78e2d194 Georgios Gousios
def parse_arguments(args):
179 78e2d194 Georgios Gousios
    from optparse import OptionParser
180 78e2d194 Georgios Gousios
181 3f018af1 Christos Stavrakakis
    default_pid_file = \
182 3f018af1 Christos Stavrakakis
        os.path.join(".", "var", "run", "synnefo", "dispatcher.pid")[1:]
183 78e2d194 Georgios Gousios
    parser = OptionParser()
184 c183005e Georgios Gousios
    parser.add_option("-d", "--debug", action="store_true", default=False,
185 2cd99e7a Georgios Gousios
                      dest="debug", help="Enable debug mode")
186 e6209aa2 Georgios Gousios
    parser.add_option("-w", "--workers", default=2, dest="workers",
187 e6209aa2 Georgios Gousios
                      help="Number of workers to spawn", type="int")
188 3d975c75 Kostas Papadimitriou
    parser.add_option("-p", "--pid-file", dest="pid_file",
189 3d975c75 Kostas Papadimitriou
                      default=default_pid_file,
190 3d975c75 Kostas Papadimitriou
                      help="Save PID to file (default: %s)" % default_pid_file)
191 979482ce Georgios Gousios
    parser.add_option("--purge-queues", action="store_true",
192 979482ce Georgios Gousios
                      default=False, dest="purge_queues",
193 57d0082a Georgios Gousios
                      help="Remove all declared queues (DANGEROUS!)")
194 979482ce Georgios Gousios
    parser.add_option("--purge-exchanges", action="store_true",
195 979482ce Georgios Gousios
                      default=False, dest="purge_exchanges",
196 979482ce Georgios Gousios
                      help="Remove all exchanges. Implies deleting all queues \
197 979482ce Georgios Gousios
                           first (DANGEROUS!)")
198 d5470cdd Georgios Gousios
    parser.add_option("--drain-queue", dest="drain_queue",
199 e6209aa2 Georgios Gousios
                      help="Strips a queue from all outstanding messages")
200 de081774 Georgios Gousios
201 78e2d194 Georgios Gousios
    return parser.parse_args(args)
202 78e2d194 Georgios Gousios
203 f30730c0 Georgios Gousios
204 d28244af Vangelis Koukis
def purge_queues():
205 979482ce Georgios Gousios
    """
206 979482ce Georgios Gousios
        Delete declared queues from RabbitMQ. Use with care!
207 979482ce Georgios Gousios
    """
208 698d0666 Georgios Gousios
    global QUEUES, BINDINGS
209 c4e55622 Christos Stavrakakis
    client = AMQPClient()
210 c4e55622 Christos Stavrakakis
    client.connect()
211 f30730c0 Georgios Gousios
212 698d0666 Georgios Gousios
    print "Queues to be deleted: ", QUEUES
213 f30730c0 Georgios Gousios
214 979482ce Georgios Gousios
    if not get_user_confirmation():
215 f30730c0 Georgios Gousios
        return
216 f30730c0 Georgios Gousios
217 698d0666 Georgios Gousios
    for queue in QUEUES:
218 c4e55622 Christos Stavrakakis
        result = client.queue_delete(queue=queue)
219 c4e55622 Christos Stavrakakis
        print "Deleting queue %s. Result: %s" % (queue, result)
220 979482ce Georgios Gousios
221 c4e55622 Christos Stavrakakis
    client.close()
222 979482ce Georgios Gousios
223 979482ce Georgios Gousios
224 979482ce Georgios Gousios
def purge_exchanges():
225 e6f5bb10 Vangelis Koukis
    """Delete declared exchanges from RabbitMQ, after removing all queues"""
226 698d0666 Georgios Gousios
    global QUEUES, BINDINGS
227 979482ce Georgios Gousios
    purge_queues()
228 979482ce Georgios Gousios
229 c4e55622 Christos Stavrakakis
    client = AMQPClient()
230 c4e55622 Christos Stavrakakis
    client.connect()
231 979482ce Georgios Gousios
232 e6f5bb10 Vangelis Koukis
    print "Exchanges to be deleted: ", settings.EXCHANGES
233 979482ce Georgios Gousios
234 979482ce Georgios Gousios
    if not get_user_confirmation():
235 979482ce Georgios Gousios
        return
236 979482ce Georgios Gousios
237 979482ce Georgios Gousios
    for exchange in settings.EXCHANGES:
238 c4e55622 Christos Stavrakakis
        result = client.exchange_delete(exchange=exchange)
239 c4e55622 Christos Stavrakakis
        print "Deleting exchange %s. Result: %s" % (exchange, result)
240 979482ce Georgios Gousios
241 c4e55622 Christos Stavrakakis
    client.close()
242 f30730c0 Georgios Gousios
243 c183005e Georgios Gousios
244 979482ce Georgios Gousios
def drain_queue(queue):
245 e6f5bb10 Vangelis Koukis
    """Strip a (declared) queue from all outstanding messages"""
246 698d0666 Georgios Gousios
    global QUEUES, BINDINGS
247 979482ce Georgios Gousios
    if not queue:
248 979482ce Georgios Gousios
        return
249 979482ce Georgios Gousios
250 698d0666 Georgios Gousios
    if not queue in QUEUES:
251 979482ce Georgios Gousios
        print "Queue %s not configured" % queue
252 979482ce Georgios Gousios
        return
253 979482ce Georgios Gousios
254 979482ce Georgios Gousios
    print "Queue to be drained: %s" % queue
255 979482ce Georgios Gousios
256 979482ce Georgios Gousios
    if not get_user_confirmation():
257 979482ce Georgios Gousios
        return
258 c4e55622 Christos Stavrakakis
259 c4e55622 Christos Stavrakakis
    client = AMQPClient()
260 c4e55622 Christos Stavrakakis
    client.connect()
261 979482ce Georgios Gousios
262 c4e55622 Christos Stavrakakis
    tag = client.basic_consume(queue=queue, callback=callbacks.dummy_proc)
263 e6209aa2 Georgios Gousios
264 e6209aa2 Georgios Gousios
    print "Queue draining about to start, hit Ctrl+c when done"
265 e6209aa2 Georgios Gousios
    time.sleep(2)
266 e6209aa2 Georgios Gousios
    print "Queue draining starting"
267 e6209aa2 Georgios Gousios
268 c626e1d0 Georgios Gousios
    num_processed = 0
269 e6209aa2 Georgios Gousios
    while True:
270 c4e55622 Christos Stavrakakis
        client.basic_wait()
271 c626e1d0 Georgios Gousios
        num_processed += 1
272 c626e1d0 Georgios Gousios
        sys.stderr.write("Ignored %d messages\r" % num_processed)
273 c626e1d0 Georgios Gousios
274 c4e55622 Christos Stavrakakis
    client.basic_cancel(tag)
275 c4e55622 Christos Stavrakakis
    client.close()
276 95aee02c Vangelis Koukis
277 979482ce Georgios Gousios
278 979482ce Georgios Gousios
def get_user_confirmation():
279 979482ce Georgios Gousios
    ans = raw_input("Are you sure (N/y):")
280 979482ce Georgios Gousios
281 979482ce Georgios Gousios
    if not ans:
282 979482ce Georgios Gousios
        return False
283 979482ce Georgios Gousios
    if ans not in ['Y', 'y']:
284 979482ce Georgios Gousios
        return False
285 979482ce Georgios Gousios
    return True
286 979482ce Georgios Gousios
287 979482ce Georgios Gousios
288 57d0082a Georgios Gousios
def debug_mode():
289 2bf8d695 Vangelis Koukis
    disp = Dispatcher(debug=True)
290 838239fa Georgios Gousios
    disp.wait()
291 838239fa Georgios Gousios
292 838239fa Georgios Gousios
293 d28244af Vangelis Koukis
def daemon_mode(opts):
294 3f018af1 Christos Stavrakakis
    disp = Dispatcher(debug=False)
295 3f018af1 Christos Stavrakakis
    disp.wait()
296 c183005e Georgios Gousios
297 d28244af Vangelis Koukis
298 d28244af Vangelis Koukis
def main():
299 cf2a3529 Christos Stavrakakis
    (opts, args) = parse_arguments(sys.argv[1:])
300 cf2a3529 Christos Stavrakakis
301 d28244af Vangelis Koukis
    # Init the global variables containing the queues
302 d28244af Vangelis Koukis
    _init_queues()
303 d28244af Vangelis Koukis
304 d28244af Vangelis Koukis
    # Special case for the clean up queues action
305 d28244af Vangelis Koukis
    if opts.purge_queues:
306 d28244af Vangelis Koukis
        purge_queues()
307 d28244af Vangelis Koukis
        return
308 d28244af Vangelis Koukis
309 d28244af Vangelis Koukis
    # Special case for the clean up exch action
310 d28244af Vangelis Koukis
    if opts.purge_exchanges:
311 d28244af Vangelis Koukis
        purge_exchanges()
312 d28244af Vangelis Koukis
        return
313 d28244af Vangelis Koukis
314 d28244af Vangelis Koukis
    if opts.drain_queue:
315 d28244af Vangelis Koukis
        drain_queue(opts.drain_queue)
316 d28244af Vangelis Koukis
        return
317 d28244af Vangelis Koukis
318 3f018af1 Christos Stavrakakis
    # Debug mode, process messages without daemonizing
319 d28244af Vangelis Koukis
    if opts.debug:
320 d28244af Vangelis Koukis
        debug_mode()
321 d28244af Vangelis Koukis
        return
322 7c62bd54 Kostas Papadimitriou
323 3f018af1 Christos Stavrakakis
    # Create pidfile,
324 3f018af1 Christos Stavrakakis
    pidf = pidlockfile.TimeoutPIDLockFile(opts.pid_file, 10)
325 3f018af1 Christos Stavrakakis
326 3f018af1 Christos Stavrakakis
    if daemon.runner.is_pidfile_stale(pidf):
327 3f018af1 Christos Stavrakakis
        log.warning("Removing stale PID lock file %s", pidf.path)
328 3f018af1 Christos Stavrakakis
        pidf.break_lock()
329 3f018af1 Christos Stavrakakis
330 9e98ba3c Giorgos Verigakis
    files_preserve = []
331 9e98ba3c Giorgos Verigakis
    for handler in log.handlers:
332 9e98ba3c Giorgos Verigakis
        stream = getattr(handler, 'stream')
333 9e98ba3c Giorgos Verigakis
        if stream and hasattr(stream, 'fileno'):
334 9e98ba3c Giorgos Verigakis
            files_preserve.append(handler.stream)
335 7c62bd54 Kostas Papadimitriou
336 3f018af1 Christos Stavrakakis
    stderr_stream = None
337 3f018af1 Christos Stavrakakis
    for handler in log.handlers:
338 3f018af1 Christos Stavrakakis
        stream = getattr(handler, 'stream')
339 3f018af1 Christos Stavrakakis
        if stream and hasattr(handler, 'baseFilename'):
340 3f018af1 Christos Stavrakakis
            stderr_stream = stream
341 3f018af1 Christos Stavrakakis
            break
342 7c62bd54 Kostas Papadimitriou
343 3f018af1 Christos Stavrakakis
    daemon_context = daemon.DaemonContext(
344 3f018af1 Christos Stavrakakis
        pidfile=pidf,
345 3f018af1 Christos Stavrakakis
        umask=0022,
346 3f018af1 Christos Stavrakakis
        stdout=stderr_stream,
347 3f018af1 Christos Stavrakakis
        stderr=stderr_stream,
348 3f018af1 Christos Stavrakakis
        files_preserve=files_preserve)
349 3f018af1 Christos Stavrakakis
350 3f018af1 Christos Stavrakakis
    try:
351 3f018af1 Christos Stavrakakis
        daemon_context.open()
352 3f018af1 Christos Stavrakakis
    except (pidlockfile.AlreadyLocked, LockTimeout):
353 3f018af1 Christos Stavrakakis
        log.critical("Failed to lock pidfile %s, another instance running?",
354 3f018af1 Christos Stavrakakis
                     pidf.path)
355 3f018af1 Christos Stavrakakis
        sys.exit(1)
356 3f018af1 Christos Stavrakakis
357 3f018af1 Christos Stavrakakis
    log.info("Became a daemon")
358 3f018af1 Christos Stavrakakis
359 3f018af1 Christos Stavrakakis
    if 'gevent' in sys.modules:
360 3f018af1 Christos Stavrakakis
        # A fork() has occured while daemonizing. If running in
361 3f018af1 Christos Stavrakakis
        # gevent context we *must* reinit gevent
362 3f018af1 Christos Stavrakakis
        log.debug("gevent imported. Reinitializing gevent")
363 3f018af1 Christos Stavrakakis
        import gevent
364 3f018af1 Christos Stavrakakis
        gevent.reinit()
365 d28244af Vangelis Koukis
366 d28244af Vangelis Koukis
    # Catch every exception, make sure it gets logged properly
367 d28244af Vangelis Koukis
    try:
368 d28244af Vangelis Koukis
        daemon_mode(opts)
369 d28244af Vangelis Koukis
    except Exception:
370 9e98ba3c Giorgos Verigakis
        log.exception("Unknown error")
371 d28244af Vangelis Koukis
        raise
372 d28244af Vangelis Koukis
373 7c62bd54 Kostas Papadimitriou
if __name__ == "__main__":
374 3d975c75 Kostas Papadimitriou
    sys.exit(main())
375 7c62bd54 Kostas Papadimitriou
376 8d8ea051 Georgios Gousios
# vim: set sta sts=4 shiftwidth=4 sw=4 et ai :