root / snf-cyclades-app / synnefo / logic / dispatcher.py @ 8ec69269
History | View | Annotate | Download (12.2 kB)
1 | d08a5f6f | Vangelis Koukis | #!/usr/bin/env python
|
---|---|---|---|
2 | 48130e66 | Georgios Gousios | # Copyright 2011 GRNET S.A. All rights reserved.
|
3 | 87ace70f | Vassilios Karakoidas | #
|
4 | 48130e66 | Georgios Gousios | # Redistribution and use in source and binary forms, with or without
|
5 | 48130e66 | Georgios Gousios | # modification, are permitted provided that the following conditions
|
6 | 48130e66 | Georgios Gousios | # are met:
|
7 | 7bd50624 | Vassilios Karakoidas | #
|
8 | 48130e66 | Georgios Gousios | # 1. Redistributions of source code must retain the above copyright
|
9 | 48130e66 | Georgios Gousios | # notice, this list of conditions and the following disclaimer.
|
10 | 48130e66 | Georgios Gousios | #
|
11 | 48130e66 | Georgios Gousios | # 2. Redistributions in binary form must reproduce the above copyright
|
12 | 48130e66 | Georgios Gousios | # notice, this list of conditions and the following disclaimer in the
|
13 | 48130e66 | Georgios Gousios | # documentation and/or other materials provided with the distribution.
|
14 | 48130e66 | Georgios Gousios | #
|
15 | 48130e66 | Georgios Gousios | # THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
|
16 | 48130e66 | Georgios Gousios | # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
|
17 | 48130e66 | Georgios Gousios | # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
|
18 | 48130e66 | Georgios Gousios | # ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
|
19 | 48130e66 | Georgios Gousios | # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
|
20 | 48130e66 | Georgios Gousios | # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
|
21 | 48130e66 | Georgios Gousios | # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
|
22 | 48130e66 | Georgios Gousios | # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
|
23 | 48130e66 | Georgios Gousios | # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
|
24 | 48130e66 | Georgios Gousios | # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
|
25 | 48130e66 | Georgios Gousios | # SUCH DAMAGE.
|
26 | 48130e66 | Georgios Gousios | #
|
27 | 48130e66 | Georgios Gousios | # The views and conclusions contained in the software and documentation are
|
28 | 48130e66 | Georgios Gousios | # those of the authors and should not be interpreted as representing official
|
29 | 48130e66 | Georgios Gousios | # policies, either expressed or implied, of GRNET S.A.
|
30 | 48130e66 | Georgios Gousios | |
31 | 48130e66 | Georgios Gousios | |
32 | e6209aa2 | Georgios Gousios | """ Message queue setup, dispatch and admin
|
33 | c183005e | Georgios Gousios |
|
34 | 2cd99e7a | Georgios Gousios | This program sets up connections to the queues configured in settings.py
|
35 | 2cd99e7a | Georgios Gousios | and implements the message wait and dispatch loops. Actual messages are
|
36 | 2cd99e7a | Georgios Gousios | handled in the dispatched functions.
|
37 | fcbc5bb3 | Vassilios Karakoidas |
|
38 | d08a5f6f | Vangelis Koukis | """
|
39 | d08a5f6f | Vangelis Koukis | from django.core.management import setup_environ |
40 | c99fe4c7 | Vassilios Karakoidas | |
41 | cf2a3529 | Christos Stavrakakis | # Fix path to import synnefo settings
|
42 | d08a5f6f | Vangelis Koukis | import sys |
43 | 86221fd5 | Panos Louridas | import os |
44 | 86221fd5 | Panos Louridas | path = os.path.normpath(os.path.join(os.getcwd(), '..'))
|
45 | 86221fd5 | Panos Louridas | sys.path.append(path) |
46 | de470b1e | Kostas Papadimitriou | from synnefo import settings |
47 | d08a5f6f | Vangelis Koukis | setup_environ(settings) |
48 | d08a5f6f | Vangelis Koukis | |
49 | 9e98ba3c | Giorgos Verigakis | import logging |
50 | 8d8ea051 | Georgios Gousios | import time |
51 | 4ed2e471 | Georgios Gousios | |
52 | 3f018af1 | Christos Stavrakakis | import daemon |
53 | cf2a3529 | Christos Stavrakakis | import daemon.runner |
54 | cf2a3529 | Christos Stavrakakis | from lockfile import LockTimeout |
55 | 4ed2e471 | Georgios Gousios | # Take care of differences between python-daemon versions.
|
56 | 4ed2e471 | Georgios Gousios | try:
|
57 | cf2a3529 | Christos Stavrakakis | from daemon import pidfile as pidlockfile |
58 | 4ed2e471 | Georgios Gousios | except:
|
59 | 4ed2e471 | Georgios Gousios | from daemon import pidlockfile |
60 | 6c9c95d8 | Christos Stavrakakis | import setproctitle |
61 | 8d8ea051 | Georgios Gousios | |
62 | cf2a3529 | Christos Stavrakakis | from synnefo.lib.amqp import AMQPClient |
63 | 9cb903f9 | Vangelis Koukis | from synnefo.logic import callbacks |
64 | 9e98ba3c | Giorgos Verigakis | |
65 | 3f018af1 | Christos Stavrakakis | from synnefo.util.dictconfig import dictConfig |
66 | 3f018af1 | Christos Stavrakakis | dictConfig(settings.DISPATCHER_LOGGING) |
67 | 9e98ba3c | Giorgos Verigakis | log = logging.getLogger() |
68 | 9e98ba3c | Giorgos Verigakis | |
69 | 698d0666 | Georgios Gousios | # Queue names
|
70 | 698d0666 | Georgios Gousios | QUEUES = [] |
71 | 698d0666 | Georgios Gousios | |
72 | 698d0666 | Georgios Gousios | # Queue bindings to exchanges
|
73 | 698d0666 | Georgios Gousios | BINDINGS = [] |
74 | 698d0666 | Georgios Gousios | |
75 | 2bf8d695 | Vangelis Koukis | |
76 | 78e2d194 | Georgios Gousios | class Dispatcher: |
77 | 5d081749 | Georgios Gousios | debug = False
|
78 | 78e2d194 | Georgios Gousios | |
79 | 2bf8d695 | Vangelis Koukis | def __init__(self, debug=False): |
80 | 5d081749 | Georgios Gousios | self.debug = debug
|
81 | 5d081749 | Georgios Gousios | self._init()
|
82 | da102335 | Georgios Gousios | |
83 | 78e2d194 | Georgios Gousios | def wait(self): |
84 | c4e55622 | Christos Stavrakakis | log.info("Waiting for messages..")
|
85 | 78e2d194 | Georgios Gousios | while True: |
86 | 78e2d194 | Georgios Gousios | try:
|
87 | c4e55622 | Christos Stavrakakis | self.client.basic_wait()
|
88 | 78e2d194 | Georgios Gousios | except SystemExit: |
89 | 78e2d194 | Georgios Gousios | break
|
90 | c4e55622 | Christos Stavrakakis | except Exception as e: |
91 | c4e55622 | Christos Stavrakakis | log.exception("Caught unexpected exception: %s", e)
|
92 | c4e55622 | Christos Stavrakakis | |
93 | c4e55622 | Christos Stavrakakis | self.client.basic_cancel()
|
94 | c4e55622 | Christos Stavrakakis | self.client.close()
|
95 | 78e2d194 | Georgios Gousios | |
96 | 5d081749 | Georgios Gousios | def _init(self): |
97 | 698d0666 | Georgios Gousios | global QUEUES, BINDINGS
|
98 | 9e98ba3c | Giorgos Verigakis | log.info("Initializing")
|
99 | 226f086a | Georgios Gousios | |
100 | c4e55622 | Christos Stavrakakis | self.client = AMQPClient()
|
101 | c4e55622 | Christos Stavrakakis | # Connect to AMQP host
|
102 | c4e55622 | Christos Stavrakakis | self.client.connect()
|
103 | 78e2d194 | Georgios Gousios | |
104 | c183005e | Georgios Gousios | # Declare queues and exchanges
|
105 | f30730c0 | Georgios Gousios | for exchange in settings.EXCHANGES: |
106 | db400d82 | Christos Stavrakakis | self.client.exchange_declare(exchange=exchange,
|
107 | db400d82 | Christos Stavrakakis | type="topic")
|
108 | f30730c0 | Georgios Gousios | |
109 | 226f086a | Georgios Gousios | for queue in QUEUES: |
110 | c4e55622 | Christos Stavrakakis | # Queues are mirrored to all RabbitMQ brokers
|
111 | db400d82 | Christos Stavrakakis | self.client.queue_declare(queue=queue, mirrored=True) |
112 | 78e2d194 | Georgios Gousios | |
113 | 226f086a | Georgios Gousios | bindings = BINDINGS |
114 | 78e2d194 | Georgios Gousios | |
115 | c183005e | Georgios Gousios | # Bind queues to handler methods
|
116 | 5d081749 | Georgios Gousios | for binding in bindings: |
117 | 78e2d194 | Georgios Gousios | try:
|
118 | 9cb903f9 | Vangelis Koukis | callback = getattr(callbacks, binding[3]) |
119 | 23c84263 | Georgios Gousios | except AttributeError: |
120 | 9e98ba3c | Giorgos Verigakis | log.error("Cannot find callback %s", binding[3]) |
121 | e6f5bb10 | Vangelis Koukis | raise SystemExit(1) |
122 | 8d8ea051 | Georgios Gousios | |
123 | c4e55622 | Christos Stavrakakis | self.client.queue_bind(queue=binding[0], exchange=binding[1], |
124 | c4e55622 | Christos Stavrakakis | routing_key=binding[2])
|
125 | c4e55622 | Christos Stavrakakis | |
126 | 370f69ec | Christos Stavrakakis | self.client.basic_consume(queue=binding[0], |
127 | c4e55622 | Christos Stavrakakis | callback=callback) |
128 | c4e55622 | Christos Stavrakakis | |
129 | 9e98ba3c | Giorgos Verigakis | log.debug("Binding %s(%s) to queue %s with handler %s",
|
130 | c4e55622 | Christos Stavrakakis | binding[1], binding[2], binding[0], binding[3]) |
131 | 8d8ea051 | Georgios Gousios | |
132 | c183005e | Georgios Gousios | |
133 | 698d0666 | Georgios Gousios | def _init_queues(): |
134 | 698d0666 | Georgios Gousios | global QUEUES, BINDINGS
|
135 | 698d0666 | Georgios Gousios | |
136 | 698d0666 | Georgios Gousios | # Queue declarations
|
137 | 698d0666 | Georgios Gousios | prefix = settings.BACKEND_PREFIX_ID.split('-')[0] |
138 | 698d0666 | Georgios Gousios | |
139 | 698d0666 | Georgios Gousios | QUEUE_GANETI_EVENTS_OP = "%s-events-op" % prefix
|
140 | a17a8e98 | Christos Stavrakakis | QUEUE_GANETI_EVENTS_NETWORK = "%s-events-network" % prefix
|
141 | 698d0666 | Georgios Gousios | QUEUE_GANETI_EVENTS_NET = "%s-events-net" % prefix
|
142 | 9068cd85 | Georgios Gousios | QUEUE_GANETI_BUILD_PROGR = "%s-events-progress" % prefix
|
143 | 698d0666 | Georgios Gousios | QUEUE_RECONC = "%s-reconciliation" % prefix
|
144 | 698d0666 | Georgios Gousios | if settings.DEBUG is True: |
145 | b47b895d | Christos Stavrakakis | QUEUE_DEBUG = "%s-debug" % prefix # Debug queue, retrieves all messages |
146 | 698d0666 | Georgios Gousios | |
147 | a17a8e98 | Christos Stavrakakis | QUEUES = (QUEUE_GANETI_EVENTS_OP, QUEUE_GANETI_EVENTS_NETWORK, QUEUE_GANETI_EVENTS_NET, QUEUE_RECONC, |
148 | fd56d250 | Giorgos Verigakis | QUEUE_GANETI_BUILD_PROGR) |
149 | 698d0666 | Georgios Gousios | |
150 | 698d0666 | Georgios Gousios | # notifications of type "ganeti-op-status"
|
151 | 2bf8d695 | Vangelis Koukis | DB_HANDLER_KEY_OP = 'ganeti.%s.event.op' % prefix
|
152 | a17a8e98 | Christos Stavrakakis | # notifications of type "ganeti-network-status"
|
153 | a17a8e98 | Christos Stavrakakis | DB_HANDLER_KEY_NETWORK = 'ganeti.%s.event.network' % prefix
|
154 | 698d0666 | Georgios Gousios | # notifications of type "ganeti-net-status"
|
155 | 2bf8d695 | Vangelis Koukis | DB_HANDLER_KEY_NET = 'ganeti.%s.event.net' % prefix
|
156 | e6f5bb10 | Vangelis Koukis | # notifications of type "ganeti-create-progress"
|
157 | e6f5bb10 | Vangelis Koukis | BUILD_MONITOR_HANDLER = 'ganeti.%s.event.progress' % prefix
|
158 | 0230cd3a | Vangelis Koukis | # reconciliation
|
159 | 633d9cfa | Georgios Gousios | RECONC_HANDLER = 'reconciliation.%s.*' % prefix
|
160 | 698d0666 | Georgios Gousios | |
161 | 698d0666 | Georgios Gousios | BINDINGS = [ |
162 | 9068cd85 | Georgios Gousios | # Queue # Exchange # RouteKey # Handler
|
163 | 9068cd85 | Georgios Gousios | (QUEUE_GANETI_EVENTS_OP, settings.EXCHANGE_GANETI, DB_HANDLER_KEY_OP, 'update_db'),
|
164 | a17a8e98 | Christos Stavrakakis | (QUEUE_GANETI_EVENTS_NETWORK, settings.EXCHANGE_GANETI, DB_HANDLER_KEY_NETWORK, 'update_network'),
|
165 | 9068cd85 | Georgios Gousios | (QUEUE_GANETI_EVENTS_NET, settings.EXCHANGE_GANETI, DB_HANDLER_KEY_NET, 'update_net'),
|
166 | cf2a3529 | Christos Stavrakakis | (QUEUE_GANETI_BUILD_PROGR, settings.EXCHANGE_GANETI, BUILD_MONITOR_HANDLER, 'update_build_progress'),
|
167 | 698d0666 | Georgios Gousios | ] |
168 | 698d0666 | Georgios Gousios | |
169 | 698d0666 | Georgios Gousios | if settings.DEBUG is True: |
170 | 698d0666 | Georgios Gousios | BINDINGS += [ |
171 | 698d0666 | Georgios Gousios | # Queue # Exchange # RouteKey # Handler
|
172 | 698d0666 | Georgios Gousios | (QUEUE_DEBUG, settings.EXCHANGE_GANETI, '#', 'dummy_proc'), |
173 | 698d0666 | Georgios Gousios | (QUEUE_DEBUG, settings.EXCHANGE_CRON, '#', 'dummy_proc'), |
174 | 698d0666 | Georgios Gousios | (QUEUE_DEBUG, settings.EXCHANGE_API, '#', 'dummy_proc'), |
175 | 698d0666 | Georgios Gousios | ] |
176 | 698d0666 | Georgios Gousios | QUEUES += (QUEUE_DEBUG,) |
177 | 698d0666 | Georgios Gousios | |
178 | 698d0666 | Georgios Gousios | |
179 | 78e2d194 | Georgios Gousios | def parse_arguments(args): |
180 | 78e2d194 | Georgios Gousios | from optparse import OptionParser |
181 | 78e2d194 | Georgios Gousios | |
182 | 3f018af1 | Christos Stavrakakis | default_pid_file = \ |
183 | 3f018af1 | Christos Stavrakakis | os.path.join(".", "var", "run", "synnefo", "dispatcher.pid")[1:] |
184 | 78e2d194 | Georgios Gousios | parser = OptionParser() |
185 | c183005e | Georgios Gousios | parser.add_option("-d", "--debug", action="store_true", default=False, |
186 | 2cd99e7a | Georgios Gousios | dest="debug", help="Enable debug mode") |
187 | e6209aa2 | Georgios Gousios | parser.add_option("-w", "--workers", default=2, dest="workers", |
188 | e6209aa2 | Georgios Gousios | help="Number of workers to spawn", type="int") |
189 | 3d975c75 | Kostas Papadimitriou | parser.add_option("-p", "--pid-file", dest="pid_file", |
190 | 3d975c75 | Kostas Papadimitriou | default=default_pid_file, |
191 | 3d975c75 | Kostas Papadimitriou | help="Save PID to file (default: %s)" % default_pid_file)
|
192 | 979482ce | Georgios Gousios | parser.add_option("--purge-queues", action="store_true", |
193 | 979482ce | Georgios Gousios | default=False, dest="purge_queues", |
194 | 57d0082a | Georgios Gousios | help="Remove all declared queues (DANGEROUS!)")
|
195 | 979482ce | Georgios Gousios | parser.add_option("--purge-exchanges", action="store_true", |
196 | 979482ce | Georgios Gousios | default=False, dest="purge_exchanges", |
197 | 979482ce | Georgios Gousios | help="Remove all exchanges. Implies deleting all queues \
|
198 | 979482ce | Georgios Gousios | first (DANGEROUS!)")
|
199 | d5470cdd | Georgios Gousios | parser.add_option("--drain-queue", dest="drain_queue", |
200 | e6209aa2 | Georgios Gousios | help="Strips a queue from all outstanding messages")
|
201 | de081774 | Georgios Gousios | |
202 | 78e2d194 | Georgios Gousios | return parser.parse_args(args)
|
203 | 78e2d194 | Georgios Gousios | |
204 | f30730c0 | Georgios Gousios | |
205 | d28244af | Vangelis Koukis | def purge_queues(): |
206 | 979482ce | Georgios Gousios | """
|
207 | 979482ce | Georgios Gousios | Delete declared queues from RabbitMQ. Use with care!
|
208 | 979482ce | Georgios Gousios | """
|
209 | 698d0666 | Georgios Gousios | global QUEUES, BINDINGS
|
210 | c4e55622 | Christos Stavrakakis | client = AMQPClient() |
211 | c4e55622 | Christos Stavrakakis | client.connect() |
212 | f30730c0 | Georgios Gousios | |
213 | 698d0666 | Georgios Gousios | print "Queues to be deleted: ", QUEUES |
214 | f30730c0 | Georgios Gousios | |
215 | 979482ce | Georgios Gousios | if not get_user_confirmation(): |
216 | f30730c0 | Georgios Gousios | return
|
217 | f30730c0 | Georgios Gousios | |
218 | 698d0666 | Georgios Gousios | for queue in QUEUES: |
219 | c4e55622 | Christos Stavrakakis | result = client.queue_delete(queue=queue) |
220 | c4e55622 | Christos Stavrakakis | print "Deleting queue %s. Result: %s" % (queue, result) |
221 | 979482ce | Georgios Gousios | |
222 | c4e55622 | Christos Stavrakakis | client.close() |
223 | 979482ce | Georgios Gousios | |
224 | 979482ce | Georgios Gousios | |
225 | 979482ce | Georgios Gousios | def purge_exchanges(): |
226 | e6f5bb10 | Vangelis Koukis | """Delete declared exchanges from RabbitMQ, after removing all queues"""
|
227 | 698d0666 | Georgios Gousios | global QUEUES, BINDINGS
|
228 | 979482ce | Georgios Gousios | purge_queues() |
229 | 979482ce | Georgios Gousios | |
230 | c4e55622 | Christos Stavrakakis | client = AMQPClient() |
231 | c4e55622 | Christos Stavrakakis | client.connect() |
232 | 979482ce | Georgios Gousios | |
233 | e6f5bb10 | Vangelis Koukis | print "Exchanges to be deleted: ", settings.EXCHANGES |
234 | 979482ce | Georgios Gousios | |
235 | 979482ce | Georgios Gousios | if not get_user_confirmation(): |
236 | 979482ce | Georgios Gousios | return
|
237 | 979482ce | Georgios Gousios | |
238 | 979482ce | Georgios Gousios | for exchange in settings.EXCHANGES: |
239 | c4e55622 | Christos Stavrakakis | result = client.exchange_delete(exchange=exchange) |
240 | c4e55622 | Christos Stavrakakis | print "Deleting exchange %s. Result: %s" % (exchange, result) |
241 | 979482ce | Georgios Gousios | |
242 | c4e55622 | Christos Stavrakakis | client.close() |
243 | f30730c0 | Georgios Gousios | |
244 | c183005e | Georgios Gousios | |
245 | 979482ce | Georgios Gousios | def drain_queue(queue): |
246 | e6f5bb10 | Vangelis Koukis | """Strip a (declared) queue from all outstanding messages"""
|
247 | 698d0666 | Georgios Gousios | global QUEUES, BINDINGS
|
248 | 979482ce | Georgios Gousios | if not queue: |
249 | 979482ce | Georgios Gousios | return
|
250 | 979482ce | Georgios Gousios | |
251 | 698d0666 | Georgios Gousios | if not queue in QUEUES: |
252 | 979482ce | Georgios Gousios | print "Queue %s not configured" % queue |
253 | 979482ce | Georgios Gousios | return
|
254 | 979482ce | Georgios Gousios | |
255 | 979482ce | Georgios Gousios | print "Queue to be drained: %s" % queue |
256 | 979482ce | Georgios Gousios | |
257 | 979482ce | Georgios Gousios | if not get_user_confirmation(): |
258 | 979482ce | Georgios Gousios | return
|
259 | c4e55622 | Christos Stavrakakis | |
260 | c4e55622 | Christos Stavrakakis | client = AMQPClient() |
261 | c4e55622 | Christos Stavrakakis | client.connect() |
262 | 979482ce | Georgios Gousios | |
263 | c4e55622 | Christos Stavrakakis | tag = client.basic_consume(queue=queue, callback=callbacks.dummy_proc) |
264 | e6209aa2 | Georgios Gousios | |
265 | e6209aa2 | Georgios Gousios | print "Queue draining about to start, hit Ctrl+c when done" |
266 | e6209aa2 | Georgios Gousios | time.sleep(2)
|
267 | e6209aa2 | Georgios Gousios | print "Queue draining starting" |
268 | e6209aa2 | Georgios Gousios | |
269 | c626e1d0 | Georgios Gousios | num_processed = 0
|
270 | e6209aa2 | Georgios Gousios | while True: |
271 | c4e55622 | Christos Stavrakakis | client.basic_wait() |
272 | c626e1d0 | Georgios Gousios | num_processed += 1
|
273 | c626e1d0 | Georgios Gousios | sys.stderr.write("Ignored %d messages\r" % num_processed)
|
274 | c626e1d0 | Georgios Gousios | |
275 | c4e55622 | Christos Stavrakakis | client.basic_cancel(tag) |
276 | c4e55622 | Christos Stavrakakis | client.close() |
277 | 95aee02c | Vangelis Koukis | |
278 | 979482ce | Georgios Gousios | |
279 | 979482ce | Georgios Gousios | def get_user_confirmation(): |
280 | 979482ce | Georgios Gousios | ans = raw_input("Are you sure (N/y):") |
281 | 979482ce | Georgios Gousios | |
282 | 979482ce | Georgios Gousios | if not ans: |
283 | 979482ce | Georgios Gousios | return False |
284 | 979482ce | Georgios Gousios | if ans not in ['Y', 'y']: |
285 | 979482ce | Georgios Gousios | return False |
286 | 979482ce | Georgios Gousios | return True |
287 | 979482ce | Georgios Gousios | |
288 | 979482ce | Georgios Gousios | |
289 | 57d0082a | Georgios Gousios | def debug_mode(): |
290 | 2bf8d695 | Vangelis Koukis | disp = Dispatcher(debug=True)
|
291 | 838239fa | Georgios Gousios | disp.wait() |
292 | 838239fa | Georgios Gousios | |
293 | 838239fa | Georgios Gousios | |
294 | d28244af | Vangelis Koukis | def daemon_mode(opts): |
295 | 3f018af1 | Christos Stavrakakis | disp = Dispatcher(debug=False)
|
296 | 3f018af1 | Christos Stavrakakis | disp.wait() |
297 | c183005e | Georgios Gousios | |
298 | d28244af | Vangelis Koukis | |
299 | d28244af | Vangelis Koukis | def main(): |
300 | cf2a3529 | Christos Stavrakakis | (opts, args) = parse_arguments(sys.argv[1:])
|
301 | cf2a3529 | Christos Stavrakakis | |
302 | 6c9c95d8 | Christos Stavrakakis | # Rename this process so 'ps' output looks like this is a native
|
303 | 6c9c95d8 | Christos Stavrakakis | # executable. Can not seperate command-line arguments from actual name of
|
304 | 6c9c95d8 | Christos Stavrakakis | # the executable by NUL bytes, so only show the name of the executable
|
305 | 6c9c95d8 | Christos Stavrakakis | # instead. setproctitle.setproctitle("\x00".join(sys.argv))
|
306 | 6c9c95d8 | Christos Stavrakakis | setproctitle.setproctitle(sys.argv[0])
|
307 | 6c9c95d8 | Christos Stavrakakis | |
308 | 8ec69269 | Christos Stavrakakis | if opts.debug:
|
309 | 8ec69269 | Christos Stavrakakis | stream_handler = logging.StreamHandler() |
310 | 8ec69269 | Christos Stavrakakis | formatter = logging.Formatter("%(asctime)s %(module)s %(levelname)s: %(message)s",
|
311 | 8ec69269 | Christos Stavrakakis | "%Y-%m-%d %H:%M:%S")
|
312 | 8ec69269 | Christos Stavrakakis | stream_handler.setFormatter(formatter) |
313 | 8ec69269 | Christos Stavrakakis | log.addHandler(stream_handler) |
314 | 8ec69269 | Christos Stavrakakis | |
315 | d28244af | Vangelis Koukis | # Init the global variables containing the queues
|
316 | d28244af | Vangelis Koukis | _init_queues() |
317 | d28244af | Vangelis Koukis | |
318 | d28244af | Vangelis Koukis | # Special case for the clean up queues action
|
319 | d28244af | Vangelis Koukis | if opts.purge_queues:
|
320 | d28244af | Vangelis Koukis | purge_queues() |
321 | d28244af | Vangelis Koukis | return
|
322 | d28244af | Vangelis Koukis | |
323 | d28244af | Vangelis Koukis | # Special case for the clean up exch action
|
324 | d28244af | Vangelis Koukis | if opts.purge_exchanges:
|
325 | d28244af | Vangelis Koukis | purge_exchanges() |
326 | d28244af | Vangelis Koukis | return
|
327 | d28244af | Vangelis Koukis | |
328 | d28244af | Vangelis Koukis | if opts.drain_queue:
|
329 | d28244af | Vangelis Koukis | drain_queue(opts.drain_queue) |
330 | d28244af | Vangelis Koukis | return
|
331 | d28244af | Vangelis Koukis | |
332 | 3f018af1 | Christos Stavrakakis | # Debug mode, process messages without daemonizing
|
333 | d28244af | Vangelis Koukis | if opts.debug:
|
334 | d28244af | Vangelis Koukis | debug_mode() |
335 | d28244af | Vangelis Koukis | return
|
336 | 7c62bd54 | Kostas Papadimitriou | |
337 | 3f018af1 | Christos Stavrakakis | # Create pidfile,
|
338 | 3f018af1 | Christos Stavrakakis | pidf = pidlockfile.TimeoutPIDLockFile(opts.pid_file, 10)
|
339 | 3f018af1 | Christos Stavrakakis | |
340 | 3f018af1 | Christos Stavrakakis | if daemon.runner.is_pidfile_stale(pidf):
|
341 | 3f018af1 | Christos Stavrakakis | log.warning("Removing stale PID lock file %s", pidf.path)
|
342 | 3f018af1 | Christos Stavrakakis | pidf.break_lock() |
343 | 3f018af1 | Christos Stavrakakis | |
344 | 9e98ba3c | Giorgos Verigakis | files_preserve = [] |
345 | 9e98ba3c | Giorgos Verigakis | for handler in log.handlers: |
346 | 9e98ba3c | Giorgos Verigakis | stream = getattr(handler, 'stream') |
347 | 9e98ba3c | Giorgos Verigakis | if stream and hasattr(stream, 'fileno'): |
348 | 9e98ba3c | Giorgos Verigakis | files_preserve.append(handler.stream) |
349 | 7c62bd54 | Kostas Papadimitriou | |
350 | 3f018af1 | Christos Stavrakakis | stderr_stream = None
|
351 | 3f018af1 | Christos Stavrakakis | for handler in log.handlers: |
352 | 3f018af1 | Christos Stavrakakis | stream = getattr(handler, 'stream') |
353 | 3f018af1 | Christos Stavrakakis | if stream and hasattr(handler, 'baseFilename'): |
354 | 3f018af1 | Christos Stavrakakis | stderr_stream = stream |
355 | 3f018af1 | Christos Stavrakakis | break
|
356 | 7c62bd54 | Kostas Papadimitriou | |
357 | 3f018af1 | Christos Stavrakakis | daemon_context = daemon.DaemonContext( |
358 | 3f018af1 | Christos Stavrakakis | pidfile=pidf, |
359 | 3f018af1 | Christos Stavrakakis | umask=0022,
|
360 | 3f018af1 | Christos Stavrakakis | stdout=stderr_stream, |
361 | 3f018af1 | Christos Stavrakakis | stderr=stderr_stream, |
362 | 3f018af1 | Christos Stavrakakis | files_preserve=files_preserve) |
363 | 3f018af1 | Christos Stavrakakis | |
364 | 3f018af1 | Christos Stavrakakis | try:
|
365 | 3f018af1 | Christos Stavrakakis | daemon_context.open() |
366 | 3f018af1 | Christos Stavrakakis | except (pidlockfile.AlreadyLocked, LockTimeout):
|
367 | 3f018af1 | Christos Stavrakakis | log.critical("Failed to lock pidfile %s, another instance running?",
|
368 | 3f018af1 | Christos Stavrakakis | pidf.path) |
369 | 3f018af1 | Christos Stavrakakis | sys.exit(1)
|
370 | 3f018af1 | Christos Stavrakakis | |
371 | 3f018af1 | Christos Stavrakakis | log.info("Became a daemon")
|
372 | 3f018af1 | Christos Stavrakakis | |
373 | 3f018af1 | Christos Stavrakakis | if 'gevent' in sys.modules: |
374 | 3f018af1 | Christos Stavrakakis | # A fork() has occured while daemonizing. If running in
|
375 | 3f018af1 | Christos Stavrakakis | # gevent context we *must* reinit gevent
|
376 | 3f018af1 | Christos Stavrakakis | log.debug("gevent imported. Reinitializing gevent")
|
377 | 3f018af1 | Christos Stavrakakis | import gevent |
378 | 3f018af1 | Christos Stavrakakis | gevent.reinit() |
379 | d28244af | Vangelis Koukis | |
380 | d28244af | Vangelis Koukis | # Catch every exception, make sure it gets logged properly
|
381 | d28244af | Vangelis Koukis | try:
|
382 | d28244af | Vangelis Koukis | daemon_mode(opts) |
383 | d28244af | Vangelis Koukis | except Exception: |
384 | 9e98ba3c | Giorgos Verigakis | log.exception("Unknown error")
|
385 | d28244af | Vangelis Koukis | raise
|
386 | d28244af | Vangelis Koukis | |
387 | 7c62bd54 | Kostas Papadimitriou | if __name__ == "__main__": |
388 | 3d975c75 | Kostas Papadimitriou | sys.exit(main()) |
389 | 7c62bd54 | Kostas Papadimitriou | |
390 | 8d8ea051 | Georgios Gousios | # vim: set sta sts=4 shiftwidth=4 sw=4 et ai : |