root / snf-cyclades-app / synnefo / logic / dispatcher.py @ 3f018af1
History | View | Annotate | Download (11.6 kB)
1 | d08a5f6f | Vangelis Koukis | #!/usr/bin/env python
|
---|---|---|---|
2 | 48130e66 | Georgios Gousios | # Copyright 2011 GRNET S.A. All rights reserved.
|
3 | 87ace70f | Vassilios Karakoidas | #
|
4 | 48130e66 | Georgios Gousios | # Redistribution and use in source and binary forms, with or without
|
5 | 48130e66 | Georgios Gousios | # modification, are permitted provided that the following conditions
|
6 | 48130e66 | Georgios Gousios | # are met:
|
7 | 7bd50624 | Vassilios Karakoidas | #
|
8 | 48130e66 | Georgios Gousios | # 1. Redistributions of source code must retain the above copyright
|
9 | 48130e66 | Georgios Gousios | # notice, this list of conditions and the following disclaimer.
|
10 | 48130e66 | Georgios Gousios | #
|
11 | 48130e66 | Georgios Gousios | # 2. Redistributions in binary form must reproduce the above copyright
|
12 | 48130e66 | Georgios Gousios | # notice, this list of conditions and the following disclaimer in the
|
13 | 48130e66 | Georgios Gousios | # documentation and/or other materials provided with the distribution.
|
14 | 48130e66 | Georgios Gousios | #
|
15 | 48130e66 | Georgios Gousios | # THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
|
16 | 48130e66 | Georgios Gousios | # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
|
17 | 48130e66 | Georgios Gousios | # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
|
18 | 48130e66 | Georgios Gousios | # ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
|
19 | 48130e66 | Georgios Gousios | # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
|
20 | 48130e66 | Georgios Gousios | # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
|
21 | 48130e66 | Georgios Gousios | # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
|
22 | 48130e66 | Georgios Gousios | # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
|
23 | 48130e66 | Georgios Gousios | # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
|
24 | 48130e66 | Georgios Gousios | # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
|
25 | 48130e66 | Georgios Gousios | # SUCH DAMAGE.
|
26 | 48130e66 | Georgios Gousios | #
|
27 | 48130e66 | Georgios Gousios | # The views and conclusions contained in the software and documentation are
|
28 | 48130e66 | Georgios Gousios | # those of the authors and should not be interpreted as representing official
|
29 | 48130e66 | Georgios Gousios | # policies, either expressed or implied, of GRNET S.A.
|
30 | 48130e66 | Georgios Gousios | |
31 | 48130e66 | Georgios Gousios | |
32 | e6209aa2 | Georgios Gousios | """ Message queue setup, dispatch and admin
|
33 | c183005e | Georgios Gousios |
|
34 | 2cd99e7a | Georgios Gousios | This program sets up connections to the queues configured in settings.py
|
35 | 2cd99e7a | Georgios Gousios | and implements the message wait and dispatch loops. Actual messages are
|
36 | 2cd99e7a | Georgios Gousios | handled in the dispatched functions.
|
37 | fcbc5bb3 | Vassilios Karakoidas |
|
38 | d08a5f6f | Vangelis Koukis | """
|
39 | d08a5f6f | Vangelis Koukis | from django.core.management import setup_environ |
40 | c99fe4c7 | Vassilios Karakoidas | |
41 | cf2a3529 | Christos Stavrakakis | # Fix path to import synnefo settings
|
42 | d08a5f6f | Vangelis Koukis | import sys |
43 | 86221fd5 | Panos Louridas | import os |
44 | 86221fd5 | Panos Louridas | path = os.path.normpath(os.path.join(os.getcwd(), '..'))
|
45 | 86221fd5 | Panos Louridas | sys.path.append(path) |
46 | de470b1e | Kostas Papadimitriou | from synnefo import settings |
47 | d08a5f6f | Vangelis Koukis | setup_environ(settings) |
48 | d08a5f6f | Vangelis Koukis | |
49 | 9e98ba3c | Giorgos Verigakis | import logging |
50 | 8d8ea051 | Georgios Gousios | import time |
51 | 4ed2e471 | Georgios Gousios | |
52 | 3f018af1 | Christos Stavrakakis | import daemon |
53 | cf2a3529 | Christos Stavrakakis | import daemon.runner |
54 | cf2a3529 | Christos Stavrakakis | from lockfile import LockTimeout |
55 | 4ed2e471 | Georgios Gousios | # Take care of differences between python-daemon versions.
|
56 | 4ed2e471 | Georgios Gousios | try:
|
57 | cf2a3529 | Christos Stavrakakis | from daemon import pidfile as pidlockfile |
58 | 4ed2e471 | Georgios Gousios | except:
|
59 | 4ed2e471 | Georgios Gousios | from daemon import pidlockfile |
60 | 8d8ea051 | Georgios Gousios | |
61 | cf2a3529 | Christos Stavrakakis | from synnefo.lib.amqp import AMQPClient |
62 | 9cb903f9 | Vangelis Koukis | from synnefo.logic import callbacks |
63 | 9e98ba3c | Giorgos Verigakis | |
64 | 3f018af1 | Christos Stavrakakis | from synnefo.util.dictconfig import dictConfig |
65 | 3f018af1 | Christos Stavrakakis | dictConfig(settings.DISPATCHER_LOGGING) |
66 | 9e98ba3c | Giorgos Verigakis | log = logging.getLogger() |
67 | 9e98ba3c | Giorgos Verigakis | |
68 | 698d0666 | Georgios Gousios | # Queue names
|
69 | 698d0666 | Georgios Gousios | QUEUES = [] |
70 | 698d0666 | Georgios Gousios | |
71 | 698d0666 | Georgios Gousios | # Queue bindings to exchanges
|
72 | 698d0666 | Georgios Gousios | BINDINGS = [] |
73 | 698d0666 | Georgios Gousios | |
74 | 2bf8d695 | Vangelis Koukis | |
75 | 78e2d194 | Georgios Gousios | class Dispatcher: |
76 | 5d081749 | Georgios Gousios | debug = False
|
77 | 78e2d194 | Georgios Gousios | |
78 | 2bf8d695 | Vangelis Koukis | def __init__(self, debug=False): |
79 | 5d081749 | Georgios Gousios | self.debug = debug
|
80 | 5d081749 | Georgios Gousios | self._init()
|
81 | da102335 | Georgios Gousios | |
82 | 78e2d194 | Georgios Gousios | def wait(self): |
83 | c4e55622 | Christos Stavrakakis | log.info("Waiting for messages..")
|
84 | 78e2d194 | Georgios Gousios | while True: |
85 | 78e2d194 | Georgios Gousios | try:
|
86 | c4e55622 | Christos Stavrakakis | self.client.basic_wait()
|
87 | 78e2d194 | Georgios Gousios | except SystemExit: |
88 | 78e2d194 | Georgios Gousios | break
|
89 | c4e55622 | Christos Stavrakakis | except Exception as e: |
90 | c4e55622 | Christos Stavrakakis | log.exception("Caught unexpected exception: %s", e)
|
91 | c4e55622 | Christos Stavrakakis | |
92 | c4e55622 | Christos Stavrakakis | self.client.basic_cancel()
|
93 | c4e55622 | Christos Stavrakakis | self.client.close()
|
94 | 78e2d194 | Georgios Gousios | |
95 | 5d081749 | Georgios Gousios | def _init(self): |
96 | 698d0666 | Georgios Gousios | global QUEUES, BINDINGS
|
97 | 9e98ba3c | Giorgos Verigakis | log.info("Initializing")
|
98 | 226f086a | Georgios Gousios | |
99 | c4e55622 | Christos Stavrakakis | self.client = AMQPClient()
|
100 | c4e55622 | Christos Stavrakakis | # Connect to AMQP host
|
101 | c4e55622 | Christos Stavrakakis | self.client.connect()
|
102 | 78e2d194 | Georgios Gousios | |
103 | c183005e | Georgios Gousios | # Declare queues and exchanges
|
104 | f30730c0 | Georgios Gousios | for exchange in settings.EXCHANGES: |
105 | db400d82 | Christos Stavrakakis | self.client.exchange_declare(exchange=exchange,
|
106 | db400d82 | Christos Stavrakakis | type="topic")
|
107 | f30730c0 | Georgios Gousios | |
108 | 226f086a | Georgios Gousios | for queue in QUEUES: |
109 | c4e55622 | Christos Stavrakakis | # Queues are mirrored to all RabbitMQ brokers
|
110 | db400d82 | Christos Stavrakakis | self.client.queue_declare(queue=queue, mirrored=True) |
111 | 78e2d194 | Georgios Gousios | |
112 | 226f086a | Georgios Gousios | bindings = BINDINGS |
113 | 78e2d194 | Georgios Gousios | |
114 | c183005e | Georgios Gousios | # Bind queues to handler methods
|
115 | 5d081749 | Georgios Gousios | for binding in bindings: |
116 | 78e2d194 | Georgios Gousios | try:
|
117 | 9cb903f9 | Vangelis Koukis | callback = getattr(callbacks, binding[3]) |
118 | 23c84263 | Georgios Gousios | except AttributeError: |
119 | 9e98ba3c | Giorgos Verigakis | log.error("Cannot find callback %s", binding[3]) |
120 | e6f5bb10 | Vangelis Koukis | raise SystemExit(1) |
121 | 8d8ea051 | Georgios Gousios | |
122 | c4e55622 | Christos Stavrakakis | self.client.queue_bind(queue=binding[0], exchange=binding[1], |
123 | c4e55622 | Christos Stavrakakis | routing_key=binding[2])
|
124 | c4e55622 | Christos Stavrakakis | |
125 | 370f69ec | Christos Stavrakakis | self.client.basic_consume(queue=binding[0], |
126 | c4e55622 | Christos Stavrakakis | callback=callback) |
127 | c4e55622 | Christos Stavrakakis | |
128 | 9e98ba3c | Giorgos Verigakis | log.debug("Binding %s(%s) to queue %s with handler %s",
|
129 | c4e55622 | Christos Stavrakakis | binding[1], binding[2], binding[0], binding[3]) |
130 | 8d8ea051 | Georgios Gousios | |
131 | c183005e | Georgios Gousios | |
132 | 698d0666 | Georgios Gousios | def _init_queues(): |
133 | 698d0666 | Georgios Gousios | global QUEUES, BINDINGS
|
134 | 698d0666 | Georgios Gousios | |
135 | 698d0666 | Georgios Gousios | # Queue declarations
|
136 | 698d0666 | Georgios Gousios | prefix = settings.BACKEND_PREFIX_ID.split('-')[0] |
137 | 698d0666 | Georgios Gousios | |
138 | 698d0666 | Georgios Gousios | QUEUE_GANETI_EVENTS_OP = "%s-events-op" % prefix
|
139 | a17a8e98 | Christos Stavrakakis | QUEUE_GANETI_EVENTS_NETWORK = "%s-events-network" % prefix
|
140 | 698d0666 | Georgios Gousios | QUEUE_GANETI_EVENTS_NET = "%s-events-net" % prefix
|
141 | 9068cd85 | Georgios Gousios | QUEUE_GANETI_BUILD_PROGR = "%s-events-progress" % prefix
|
142 | 698d0666 | Georgios Gousios | QUEUE_RECONC = "%s-reconciliation" % prefix
|
143 | 698d0666 | Georgios Gousios | if settings.DEBUG is True: |
144 | b47b895d | Christos Stavrakakis | QUEUE_DEBUG = "%s-debug" % prefix # Debug queue, retrieves all messages |
145 | 698d0666 | Georgios Gousios | |
146 | a17a8e98 | Christos Stavrakakis | QUEUES = (QUEUE_GANETI_EVENTS_OP, QUEUE_GANETI_EVENTS_NETWORK, QUEUE_GANETI_EVENTS_NET, QUEUE_RECONC, |
147 | fd56d250 | Giorgos Verigakis | QUEUE_GANETI_BUILD_PROGR) |
148 | 698d0666 | Georgios Gousios | |
149 | 698d0666 | Georgios Gousios | # notifications of type "ganeti-op-status"
|
150 | 2bf8d695 | Vangelis Koukis | DB_HANDLER_KEY_OP = 'ganeti.%s.event.op' % prefix
|
151 | a17a8e98 | Christos Stavrakakis | # notifications of type "ganeti-network-status"
|
152 | a17a8e98 | Christos Stavrakakis | DB_HANDLER_KEY_NETWORK = 'ganeti.%s.event.network' % prefix
|
153 | 698d0666 | Georgios Gousios | # notifications of type "ganeti-net-status"
|
154 | 2bf8d695 | Vangelis Koukis | DB_HANDLER_KEY_NET = 'ganeti.%s.event.net' % prefix
|
155 | e6f5bb10 | Vangelis Koukis | # notifications of type "ganeti-create-progress"
|
156 | e6f5bb10 | Vangelis Koukis | BUILD_MONITOR_HANDLER = 'ganeti.%s.event.progress' % prefix
|
157 | 0230cd3a | Vangelis Koukis | # reconciliation
|
158 | 633d9cfa | Georgios Gousios | RECONC_HANDLER = 'reconciliation.%s.*' % prefix
|
159 | 698d0666 | Georgios Gousios | |
160 | 698d0666 | Georgios Gousios | BINDINGS = [ |
161 | 9068cd85 | Georgios Gousios | # Queue # Exchange # RouteKey # Handler
|
162 | 9068cd85 | Georgios Gousios | (QUEUE_GANETI_EVENTS_OP, settings.EXCHANGE_GANETI, DB_HANDLER_KEY_OP, 'update_db'),
|
163 | a17a8e98 | Christos Stavrakakis | (QUEUE_GANETI_EVENTS_NETWORK, settings.EXCHANGE_GANETI, DB_HANDLER_KEY_NETWORK, 'update_network'),
|
164 | 9068cd85 | Georgios Gousios | (QUEUE_GANETI_EVENTS_NET, settings.EXCHANGE_GANETI, DB_HANDLER_KEY_NET, 'update_net'),
|
165 | cf2a3529 | Christos Stavrakakis | (QUEUE_GANETI_BUILD_PROGR, settings.EXCHANGE_GANETI, BUILD_MONITOR_HANDLER, 'update_build_progress'),
|
166 | 698d0666 | Georgios Gousios | ] |
167 | 698d0666 | Georgios Gousios | |
168 | 698d0666 | Georgios Gousios | if settings.DEBUG is True: |
169 | 698d0666 | Georgios Gousios | BINDINGS += [ |
170 | 698d0666 | Georgios Gousios | # Queue # Exchange # RouteKey # Handler
|
171 | 698d0666 | Georgios Gousios | (QUEUE_DEBUG, settings.EXCHANGE_GANETI, '#', 'dummy_proc'), |
172 | 698d0666 | Georgios Gousios | (QUEUE_DEBUG, settings.EXCHANGE_CRON, '#', 'dummy_proc'), |
173 | 698d0666 | Georgios Gousios | (QUEUE_DEBUG, settings.EXCHANGE_API, '#', 'dummy_proc'), |
174 | 698d0666 | Georgios Gousios | ] |
175 | 698d0666 | Georgios Gousios | QUEUES += (QUEUE_DEBUG,) |
176 | 698d0666 | Georgios Gousios | |
177 | 698d0666 | Georgios Gousios | |
178 | 78e2d194 | Georgios Gousios | def parse_arguments(args): |
179 | 78e2d194 | Georgios Gousios | from optparse import OptionParser |
180 | 78e2d194 | Georgios Gousios | |
181 | 3f018af1 | Christos Stavrakakis | default_pid_file = \ |
182 | 3f018af1 | Christos Stavrakakis | os.path.join(".", "var", "run", "synnefo", "dispatcher.pid")[1:] |
183 | 78e2d194 | Georgios Gousios | parser = OptionParser() |
184 | c183005e | Georgios Gousios | parser.add_option("-d", "--debug", action="store_true", default=False, |
185 | 2cd99e7a | Georgios Gousios | dest="debug", help="Enable debug mode") |
186 | e6209aa2 | Georgios Gousios | parser.add_option("-w", "--workers", default=2, dest="workers", |
187 | e6209aa2 | Georgios Gousios | help="Number of workers to spawn", type="int") |
188 | 3d975c75 | Kostas Papadimitriou | parser.add_option("-p", "--pid-file", dest="pid_file", |
189 | 3d975c75 | Kostas Papadimitriou | default=default_pid_file, |
190 | 3d975c75 | Kostas Papadimitriou | help="Save PID to file (default: %s)" % default_pid_file)
|
191 | 979482ce | Georgios Gousios | parser.add_option("--purge-queues", action="store_true", |
192 | 979482ce | Georgios Gousios | default=False, dest="purge_queues", |
193 | 57d0082a | Georgios Gousios | help="Remove all declared queues (DANGEROUS!)")
|
194 | 979482ce | Georgios Gousios | parser.add_option("--purge-exchanges", action="store_true", |
195 | 979482ce | Georgios Gousios | default=False, dest="purge_exchanges", |
196 | 979482ce | Georgios Gousios | help="Remove all exchanges. Implies deleting all queues \
|
197 | 979482ce | Georgios Gousios | first (DANGEROUS!)")
|
198 | d5470cdd | Georgios Gousios | parser.add_option("--drain-queue", dest="drain_queue", |
199 | e6209aa2 | Georgios Gousios | help="Strips a queue from all outstanding messages")
|
200 | de081774 | Georgios Gousios | |
201 | 78e2d194 | Georgios Gousios | return parser.parse_args(args)
|
202 | 78e2d194 | Georgios Gousios | |
203 | f30730c0 | Georgios Gousios | |
204 | d28244af | Vangelis Koukis | def purge_queues(): |
205 | 979482ce | Georgios Gousios | """
|
206 | 979482ce | Georgios Gousios | Delete declared queues from RabbitMQ. Use with care!
|
207 | 979482ce | Georgios Gousios | """
|
208 | 698d0666 | Georgios Gousios | global QUEUES, BINDINGS
|
209 | c4e55622 | Christos Stavrakakis | client = AMQPClient() |
210 | c4e55622 | Christos Stavrakakis | client.connect() |
211 | f30730c0 | Georgios Gousios | |
212 | 698d0666 | Georgios Gousios | print "Queues to be deleted: ", QUEUES |
213 | f30730c0 | Georgios Gousios | |
214 | 979482ce | Georgios Gousios | if not get_user_confirmation(): |
215 | f30730c0 | Georgios Gousios | return
|
216 | f30730c0 | Georgios Gousios | |
217 | 698d0666 | Georgios Gousios | for queue in QUEUES: |
218 | c4e55622 | Christos Stavrakakis | result = client.queue_delete(queue=queue) |
219 | c4e55622 | Christos Stavrakakis | print "Deleting queue %s. Result: %s" % (queue, result) |
220 | 979482ce | Georgios Gousios | |
221 | c4e55622 | Christos Stavrakakis | client.close() |
222 | 979482ce | Georgios Gousios | |
223 | 979482ce | Georgios Gousios | |
224 | 979482ce | Georgios Gousios | def purge_exchanges(): |
225 | e6f5bb10 | Vangelis Koukis | """Delete declared exchanges from RabbitMQ, after removing all queues"""
|
226 | 698d0666 | Georgios Gousios | global QUEUES, BINDINGS
|
227 | 979482ce | Georgios Gousios | purge_queues() |
228 | 979482ce | Georgios Gousios | |
229 | c4e55622 | Christos Stavrakakis | client = AMQPClient() |
230 | c4e55622 | Christos Stavrakakis | client.connect() |
231 | 979482ce | Georgios Gousios | |
232 | e6f5bb10 | Vangelis Koukis | print "Exchanges to be deleted: ", settings.EXCHANGES |
233 | 979482ce | Georgios Gousios | |
234 | 979482ce | Georgios Gousios | if not get_user_confirmation(): |
235 | 979482ce | Georgios Gousios | return
|
236 | 979482ce | Georgios Gousios | |
237 | 979482ce | Georgios Gousios | for exchange in settings.EXCHANGES: |
238 | c4e55622 | Christos Stavrakakis | result = client.exchange_delete(exchange=exchange) |
239 | c4e55622 | Christos Stavrakakis | print "Deleting exchange %s. Result: %s" % (exchange, result) |
240 | 979482ce | Georgios Gousios | |
241 | c4e55622 | Christos Stavrakakis | client.close() |
242 | f30730c0 | Georgios Gousios | |
243 | c183005e | Georgios Gousios | |
244 | 979482ce | Georgios Gousios | def drain_queue(queue): |
245 | e6f5bb10 | Vangelis Koukis | """Strip a (declared) queue from all outstanding messages"""
|
246 | 698d0666 | Georgios Gousios | global QUEUES, BINDINGS
|
247 | 979482ce | Georgios Gousios | if not queue: |
248 | 979482ce | Georgios Gousios | return
|
249 | 979482ce | Georgios Gousios | |
250 | 698d0666 | Georgios Gousios | if not queue in QUEUES: |
251 | 979482ce | Georgios Gousios | print "Queue %s not configured" % queue |
252 | 979482ce | Georgios Gousios | return
|
253 | 979482ce | Georgios Gousios | |
254 | 979482ce | Georgios Gousios | print "Queue to be drained: %s" % queue |
255 | 979482ce | Georgios Gousios | |
256 | 979482ce | Georgios Gousios | if not get_user_confirmation(): |
257 | 979482ce | Georgios Gousios | return
|
258 | c4e55622 | Christos Stavrakakis | |
259 | c4e55622 | Christos Stavrakakis | client = AMQPClient() |
260 | c4e55622 | Christos Stavrakakis | client.connect() |
261 | 979482ce | Georgios Gousios | |
262 | c4e55622 | Christos Stavrakakis | tag = client.basic_consume(queue=queue, callback=callbacks.dummy_proc) |
263 | e6209aa2 | Georgios Gousios | |
264 | e6209aa2 | Georgios Gousios | print "Queue draining about to start, hit Ctrl+c when done" |
265 | e6209aa2 | Georgios Gousios | time.sleep(2)
|
266 | e6209aa2 | Georgios Gousios | print "Queue draining starting" |
267 | e6209aa2 | Georgios Gousios | |
268 | c626e1d0 | Georgios Gousios | num_processed = 0
|
269 | e6209aa2 | Georgios Gousios | while True: |
270 | c4e55622 | Christos Stavrakakis | client.basic_wait() |
271 | c626e1d0 | Georgios Gousios | num_processed += 1
|
272 | c626e1d0 | Georgios Gousios | sys.stderr.write("Ignored %d messages\r" % num_processed)
|
273 | c626e1d0 | Georgios Gousios | |
274 | c4e55622 | Christos Stavrakakis | client.basic_cancel(tag) |
275 | c4e55622 | Christos Stavrakakis | client.close() |
276 | 95aee02c | Vangelis Koukis | |
277 | 979482ce | Georgios Gousios | |
278 | 979482ce | Georgios Gousios | def get_user_confirmation(): |
279 | 979482ce | Georgios Gousios | ans = raw_input("Are you sure (N/y):") |
280 | 979482ce | Georgios Gousios | |
281 | 979482ce | Georgios Gousios | if not ans: |
282 | 979482ce | Georgios Gousios | return False |
283 | 979482ce | Georgios Gousios | if ans not in ['Y', 'y']: |
284 | 979482ce | Georgios Gousios | return False |
285 | 979482ce | Georgios Gousios | return True |
286 | 979482ce | Georgios Gousios | |
287 | 979482ce | Georgios Gousios | |
288 | 57d0082a | Georgios Gousios | def debug_mode(): |
289 | 2bf8d695 | Vangelis Koukis | disp = Dispatcher(debug=True)
|
290 | 838239fa | Georgios Gousios | disp.wait() |
291 | 838239fa | Georgios Gousios | |
292 | 838239fa | Georgios Gousios | |
293 | d28244af | Vangelis Koukis | def daemon_mode(opts): |
294 | 3f018af1 | Christos Stavrakakis | disp = Dispatcher(debug=False)
|
295 | 3f018af1 | Christos Stavrakakis | disp.wait() |
296 | c183005e | Georgios Gousios | |
297 | d28244af | Vangelis Koukis | |
298 | d28244af | Vangelis Koukis | def main(): |
299 | cf2a3529 | Christos Stavrakakis | (opts, args) = parse_arguments(sys.argv[1:])
|
300 | cf2a3529 | Christos Stavrakakis | |
301 | d28244af | Vangelis Koukis | # Init the global variables containing the queues
|
302 | d28244af | Vangelis Koukis | _init_queues() |
303 | d28244af | Vangelis Koukis | |
304 | d28244af | Vangelis Koukis | # Special case for the clean up queues action
|
305 | d28244af | Vangelis Koukis | if opts.purge_queues:
|
306 | d28244af | Vangelis Koukis | purge_queues() |
307 | d28244af | Vangelis Koukis | return
|
308 | d28244af | Vangelis Koukis | |
309 | d28244af | Vangelis Koukis | # Special case for the clean up exch action
|
310 | d28244af | Vangelis Koukis | if opts.purge_exchanges:
|
311 | d28244af | Vangelis Koukis | purge_exchanges() |
312 | d28244af | Vangelis Koukis | return
|
313 | d28244af | Vangelis Koukis | |
314 | d28244af | Vangelis Koukis | if opts.drain_queue:
|
315 | d28244af | Vangelis Koukis | drain_queue(opts.drain_queue) |
316 | d28244af | Vangelis Koukis | return
|
317 | d28244af | Vangelis Koukis | |
318 | 3f018af1 | Christos Stavrakakis | # Debug mode, process messages without daemonizing
|
319 | d28244af | Vangelis Koukis | if opts.debug:
|
320 | d28244af | Vangelis Koukis | debug_mode() |
321 | d28244af | Vangelis Koukis | return
|
322 | 7c62bd54 | Kostas Papadimitriou | |
323 | 3f018af1 | Christos Stavrakakis | # Create pidfile,
|
324 | 3f018af1 | Christos Stavrakakis | pidf = pidlockfile.TimeoutPIDLockFile(opts.pid_file, 10)
|
325 | 3f018af1 | Christos Stavrakakis | |
326 | 3f018af1 | Christos Stavrakakis | if daemon.runner.is_pidfile_stale(pidf):
|
327 | 3f018af1 | Christos Stavrakakis | log.warning("Removing stale PID lock file %s", pidf.path)
|
328 | 3f018af1 | Christos Stavrakakis | pidf.break_lock() |
329 | 3f018af1 | Christos Stavrakakis | |
330 | 9e98ba3c | Giorgos Verigakis | files_preserve = [] |
331 | 9e98ba3c | Giorgos Verigakis | for handler in log.handlers: |
332 | 9e98ba3c | Giorgos Verigakis | stream = getattr(handler, 'stream') |
333 | 9e98ba3c | Giorgos Verigakis | if stream and hasattr(stream, 'fileno'): |
334 | 9e98ba3c | Giorgos Verigakis | files_preserve.append(handler.stream) |
335 | 7c62bd54 | Kostas Papadimitriou | |
336 | 3f018af1 | Christos Stavrakakis | stderr_stream = None
|
337 | 3f018af1 | Christos Stavrakakis | for handler in log.handlers: |
338 | 3f018af1 | Christos Stavrakakis | stream = getattr(handler, 'stream') |
339 | 3f018af1 | Christos Stavrakakis | if stream and hasattr(handler, 'baseFilename'): |
340 | 3f018af1 | Christos Stavrakakis | stderr_stream = stream |
341 | 3f018af1 | Christos Stavrakakis | break
|
342 | 7c62bd54 | Kostas Papadimitriou | |
343 | 3f018af1 | Christos Stavrakakis | daemon_context = daemon.DaemonContext( |
344 | 3f018af1 | Christos Stavrakakis | pidfile=pidf, |
345 | 3f018af1 | Christos Stavrakakis | umask=0022,
|
346 | 3f018af1 | Christos Stavrakakis | stdout=stderr_stream, |
347 | 3f018af1 | Christos Stavrakakis | stderr=stderr_stream, |
348 | 3f018af1 | Christos Stavrakakis | files_preserve=files_preserve) |
349 | 3f018af1 | Christos Stavrakakis | |
350 | 3f018af1 | Christos Stavrakakis | try:
|
351 | 3f018af1 | Christos Stavrakakis | daemon_context.open() |
352 | 3f018af1 | Christos Stavrakakis | except (pidlockfile.AlreadyLocked, LockTimeout):
|
353 | 3f018af1 | Christos Stavrakakis | log.critical("Failed to lock pidfile %s, another instance running?",
|
354 | 3f018af1 | Christos Stavrakakis | pidf.path) |
355 | 3f018af1 | Christos Stavrakakis | sys.exit(1)
|
356 | 3f018af1 | Christos Stavrakakis | |
357 | 3f018af1 | Christos Stavrakakis | log.info("Became a daemon")
|
358 | 3f018af1 | Christos Stavrakakis | |
359 | 3f018af1 | Christos Stavrakakis | if 'gevent' in sys.modules: |
360 | 3f018af1 | Christos Stavrakakis | # A fork() has occured while daemonizing. If running in
|
361 | 3f018af1 | Christos Stavrakakis | # gevent context we *must* reinit gevent
|
362 | 3f018af1 | Christos Stavrakakis | log.debug("gevent imported. Reinitializing gevent")
|
363 | 3f018af1 | Christos Stavrakakis | import gevent |
364 | 3f018af1 | Christos Stavrakakis | gevent.reinit() |
365 | d28244af | Vangelis Koukis | |
366 | d28244af | Vangelis Koukis | # Catch every exception, make sure it gets logged properly
|
367 | d28244af | Vangelis Koukis | try:
|
368 | d28244af | Vangelis Koukis | daemon_mode(opts) |
369 | d28244af | Vangelis Koukis | except Exception: |
370 | 9e98ba3c | Giorgos Verigakis | log.exception("Unknown error")
|
371 | d28244af | Vangelis Koukis | raise
|
372 | d28244af | Vangelis Koukis | |
373 | 7c62bd54 | Kostas Papadimitriou | if __name__ == "__main__": |
374 | 3d975c75 | Kostas Papadimitriou | sys.exit(main()) |
375 | 7c62bd54 | Kostas Papadimitriou | |
376 | 8d8ea051 | Georgios Gousios | # vim: set sta sts=4 shiftwidth=4 sw=4 et ai : |