root / snf-cyclades-app / synnefo / logic / dispatcher.py @ 22ee6892
History | View | Annotate | Download (12.9 kB)
1 | d08a5f6f | Vangelis Koukis | #!/usr/bin/env python
|
---|---|---|---|
2 | 48130e66 | Georgios Gousios | # Copyright 2011 GRNET S.A. All rights reserved.
|
3 | 87ace70f | Vassilios Karakoidas | #
|
4 | 48130e66 | Georgios Gousios | # Redistribution and use in source and binary forms, with or without
|
5 | 48130e66 | Georgios Gousios | # modification, are permitted provided that the following conditions
|
6 | 48130e66 | Georgios Gousios | # are met:
|
7 | 7bd50624 | Vassilios Karakoidas | #
|
8 | 48130e66 | Georgios Gousios | # 1. Redistributions of source code must retain the above copyright
|
9 | 48130e66 | Georgios Gousios | # notice, this list of conditions and the following disclaimer.
|
10 | 48130e66 | Georgios Gousios | #
|
11 | 48130e66 | Georgios Gousios | # 2. Redistributions in binary form must reproduce the above copyright
|
12 | 48130e66 | Georgios Gousios | # notice, this list of conditions and the following disclaimer in the
|
13 | 48130e66 | Georgios Gousios | # documentation and/or other materials provided with the distribution.
|
14 | 48130e66 | Georgios Gousios | #
|
15 | 48130e66 | Georgios Gousios | # THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
|
16 | 48130e66 | Georgios Gousios | # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
|
17 | 48130e66 | Georgios Gousios | # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
|
18 | 48130e66 | Georgios Gousios | # ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
|
19 | 48130e66 | Georgios Gousios | # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
|
20 | 48130e66 | Georgios Gousios | # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
|
21 | 48130e66 | Georgios Gousios | # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
|
22 | 48130e66 | Georgios Gousios | # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
|
23 | 48130e66 | Georgios Gousios | # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
|
24 | 48130e66 | Georgios Gousios | # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
|
25 | 48130e66 | Georgios Gousios | # SUCH DAMAGE.
|
26 | 48130e66 | Georgios Gousios | #
|
27 | 48130e66 | Georgios Gousios | # The views and conclusions contained in the software and documentation are
|
28 | 48130e66 | Georgios Gousios | # those of the authors and should not be interpreted as representing official
|
29 | 48130e66 | Georgios Gousios | # policies, either expressed or implied, of GRNET S.A.
|
30 | 48130e66 | Georgios Gousios | |
31 | 48130e66 | Georgios Gousios | |
32 | e6209aa2 | Georgios Gousios | """ Message queue setup, dispatch and admin
|
33 | c183005e | Georgios Gousios |
|
34 | 2cd99e7a | Georgios Gousios | This program sets up connections to the queues configured in settings.py
|
35 | 2cd99e7a | Georgios Gousios | and implements the message wait and dispatch loops. Actual messages are
|
36 | 2cd99e7a | Georgios Gousios | handled in the dispatched functions.
|
37 | fcbc5bb3 | Vassilios Karakoidas |
|
38 | d08a5f6f | Vangelis Koukis | """
|
39 | d08a5f6f | Vangelis Koukis | from django.core.management import setup_environ |
40 | c99fe4c7 | Vassilios Karakoidas | |
41 | cf2a3529 | Christos Stavrakakis | # Fix path to import synnefo settings
|
42 | d08a5f6f | Vangelis Koukis | import sys |
43 | 86221fd5 | Panos Louridas | import os |
44 | 86221fd5 | Panos Louridas | path = os.path.normpath(os.path.join(os.getcwd(), '..'))
|
45 | 86221fd5 | Panos Louridas | sys.path.append(path) |
46 | d08a5f6f | Vangelis Koukis | |
47 | de470b1e | Kostas Papadimitriou | from synnefo import settings |
48 | d08a5f6f | Vangelis Koukis | setup_environ(settings) |
49 | d08a5f6f | Vangelis Koukis | |
50 | 9e98ba3c | Giorgos Verigakis | import logging |
51 | 8d8ea051 | Georgios Gousios | import time |
52 | 4ed2e471 | Georgios Gousios | |
53 | cf2a3529 | Christos Stavrakakis | import daemon.runner |
54 | cf2a3529 | Christos Stavrakakis | import daemon.daemon |
55 | cf2a3529 | Christos Stavrakakis | from lockfile import LockTimeout |
56 | cf2a3529 | Christos Stavrakakis | from signal import signal, SIGINT, SIGTERM |
57 | c4e55622 | Christos Stavrakakis | |
58 | 4ed2e471 | Georgios Gousios | # Take care of differences between python-daemon versions.
|
59 | 4ed2e471 | Georgios Gousios | try:
|
60 | cf2a3529 | Christos Stavrakakis | from daemon import pidfile as pidlockfile |
61 | 4ed2e471 | Georgios Gousios | except:
|
62 | 4ed2e471 | Georgios Gousios | from daemon import pidlockfile |
63 | 8d8ea051 | Georgios Gousios | |
64 | cf2a3529 | Christos Stavrakakis | from synnefo.lib.amqp import AMQPClient |
65 | 9cb903f9 | Vangelis Koukis | from synnefo.logic import callbacks |
66 | 9e98ba3c | Giorgos Verigakis | from synnefo.util.dictconfig import dictConfig |
67 | 9e98ba3c | Giorgos Verigakis | |
68 | 9e98ba3c | Giorgos Verigakis | |
69 | 9e98ba3c | Giorgos Verigakis | log = logging.getLogger() |
70 | 9e98ba3c | Giorgos Verigakis | |
71 | c99fe4c7 | Vassilios Karakoidas | |
72 | 698d0666 | Georgios Gousios | # Queue names
|
73 | 698d0666 | Georgios Gousios | QUEUES = [] |
74 | 698d0666 | Georgios Gousios | |
75 | 698d0666 | Georgios Gousios | # Queue bindings to exchanges
|
76 | 698d0666 | Georgios Gousios | BINDINGS = [] |
77 | 698d0666 | Georgios Gousios | |
78 | 2bf8d695 | Vangelis Koukis | |
79 | 78e2d194 | Georgios Gousios | class Dispatcher: |
80 | 5d081749 | Georgios Gousios | debug = False
|
81 | c4e55622 | Christos Stavrakakis | client_promises = [] |
82 | 78e2d194 | Georgios Gousios | |
83 | 2bf8d695 | Vangelis Koukis | def __init__(self, debug=False): |
84 | 5d081749 | Georgios Gousios | self.debug = debug
|
85 | 5d081749 | Georgios Gousios | self._init()
|
86 | da102335 | Georgios Gousios | |
87 | 78e2d194 | Georgios Gousios | def wait(self): |
88 | c4e55622 | Christos Stavrakakis | log.info("Waiting for messages..")
|
89 | 78e2d194 | Georgios Gousios | while True: |
90 | 78e2d194 | Georgios Gousios | try:
|
91 | c4e55622 | Christos Stavrakakis | self.client.basic_wait()
|
92 | 78e2d194 | Georgios Gousios | except SystemExit: |
93 | 78e2d194 | Georgios Gousios | break
|
94 | c4e55622 | Christos Stavrakakis | except Exception as e: |
95 | c4e55622 | Christos Stavrakakis | log.exception("Caught unexpected exception: %s", e)
|
96 | c4e55622 | Christos Stavrakakis | |
97 | c4e55622 | Christos Stavrakakis | self.client.basic_cancel()
|
98 | c4e55622 | Christos Stavrakakis | self.client.close()
|
99 | 78e2d194 | Georgios Gousios | |
100 | 5d081749 | Georgios Gousios | def _init(self): |
101 | 698d0666 | Georgios Gousios | global QUEUES, BINDINGS
|
102 | 9e98ba3c | Giorgos Verigakis | log.info("Initializing")
|
103 | 226f086a | Georgios Gousios | |
104 | c4e55622 | Christos Stavrakakis | self.client = AMQPClient()
|
105 | c4e55622 | Christos Stavrakakis | # Connect to AMQP host
|
106 | c4e55622 | Christos Stavrakakis | self.client.connect()
|
107 | 78e2d194 | Georgios Gousios | |
108 | c183005e | Georgios Gousios | # Declare queues and exchanges
|
109 | f30730c0 | Georgios Gousios | for exchange in settings.EXCHANGES: |
110 | db400d82 | Christos Stavrakakis | self.client.exchange_declare(exchange=exchange,
|
111 | db400d82 | Christos Stavrakakis | type="topic")
|
112 | f30730c0 | Georgios Gousios | |
113 | 226f086a | Georgios Gousios | for queue in QUEUES: |
114 | c4e55622 | Christos Stavrakakis | # Queues are mirrored to all RabbitMQ brokers
|
115 | db400d82 | Christos Stavrakakis | self.client.queue_declare(queue=queue, mirrored=True) |
116 | 78e2d194 | Georgios Gousios | |
117 | 226f086a | Georgios Gousios | bindings = BINDINGS |
118 | 78e2d194 | Georgios Gousios | |
119 | c183005e | Georgios Gousios | # Bind queues to handler methods
|
120 | 5d081749 | Georgios Gousios | for binding in bindings: |
121 | 78e2d194 | Georgios Gousios | try:
|
122 | 9cb903f9 | Vangelis Koukis | callback = getattr(callbacks, binding[3]) |
123 | 23c84263 | Georgios Gousios | except AttributeError: |
124 | 9e98ba3c | Giorgos Verigakis | log.error("Cannot find callback %s", binding[3]) |
125 | e6f5bb10 | Vangelis Koukis | raise SystemExit(1) |
126 | 8d8ea051 | Georgios Gousios | |
127 | c4e55622 | Christos Stavrakakis | self.client.queue_bind(queue=binding[0], exchange=binding[1], |
128 | c4e55622 | Christos Stavrakakis | routing_key=binding[2])
|
129 | c4e55622 | Christos Stavrakakis | |
130 | c4e55622 | Christos Stavrakakis | consume_promise = self.client.basic_consume(queue=binding[0], |
131 | c4e55622 | Christos Stavrakakis | callback=callback) |
132 | c4e55622 | Christos Stavrakakis | |
133 | 9e98ba3c | Giorgos Verigakis | log.debug("Binding %s(%s) to queue %s with handler %s",
|
134 | c4e55622 | Christos Stavrakakis | binding[1], binding[2], binding[0], binding[3]) |
135 | c4e55622 | Christos Stavrakakis | self.client_promises.append(consume_promise)
|
136 | 8d8ea051 | Georgios Gousios | |
137 | c183005e | Georgios Gousios | |
138 | 698d0666 | Georgios Gousios | def _init_queues(): |
139 | 698d0666 | Georgios Gousios | global QUEUES, BINDINGS
|
140 | 698d0666 | Georgios Gousios | |
141 | 698d0666 | Georgios Gousios | # Queue declarations
|
142 | 698d0666 | Georgios Gousios | prefix = settings.BACKEND_PREFIX_ID.split('-')[0] |
143 | 698d0666 | Georgios Gousios | |
144 | 698d0666 | Georgios Gousios | QUEUE_GANETI_EVENTS_OP = "%s-events-op" % prefix
|
145 | a17a8e98 | Christos Stavrakakis | QUEUE_GANETI_EVENTS_NETWORK = "%s-events-network" % prefix
|
146 | 698d0666 | Georgios Gousios | QUEUE_GANETI_EVENTS_NET = "%s-events-net" % prefix
|
147 | 9068cd85 | Georgios Gousios | QUEUE_GANETI_BUILD_PROGR = "%s-events-progress" % prefix
|
148 | 698d0666 | Georgios Gousios | QUEUE_RECONC = "%s-reconciliation" % prefix
|
149 | 698d0666 | Georgios Gousios | if settings.DEBUG is True: |
150 | b47b895d | Christos Stavrakakis | QUEUE_DEBUG = "%s-debug" % prefix # Debug queue, retrieves all messages |
151 | 698d0666 | Georgios Gousios | |
152 | a17a8e98 | Christos Stavrakakis | QUEUES = (QUEUE_GANETI_EVENTS_OP, QUEUE_GANETI_EVENTS_NETWORK, QUEUE_GANETI_EVENTS_NET, QUEUE_RECONC, |
153 | fd56d250 | Giorgos Verigakis | QUEUE_GANETI_BUILD_PROGR) |
154 | 698d0666 | Georgios Gousios | |
155 | 698d0666 | Georgios Gousios | # notifications of type "ganeti-op-status"
|
156 | 2bf8d695 | Vangelis Koukis | DB_HANDLER_KEY_OP = 'ganeti.%s.event.op' % prefix
|
157 | a17a8e98 | Christos Stavrakakis | # notifications of type "ganeti-network-status"
|
158 | a17a8e98 | Christos Stavrakakis | DB_HANDLER_KEY_NETWORK = 'ganeti.%s.event.network' % prefix
|
159 | 698d0666 | Georgios Gousios | # notifications of type "ganeti-net-status"
|
160 | 2bf8d695 | Vangelis Koukis | DB_HANDLER_KEY_NET = 'ganeti.%s.event.net' % prefix
|
161 | e6f5bb10 | Vangelis Koukis | # notifications of type "ganeti-create-progress"
|
162 | e6f5bb10 | Vangelis Koukis | BUILD_MONITOR_HANDLER = 'ganeti.%s.event.progress' % prefix
|
163 | 0230cd3a | Vangelis Koukis | # reconciliation
|
164 | 633d9cfa | Georgios Gousios | RECONC_HANDLER = 'reconciliation.%s.*' % prefix
|
165 | 698d0666 | Georgios Gousios | |
166 | 698d0666 | Georgios Gousios | BINDINGS = [ |
167 | 9068cd85 | Georgios Gousios | # Queue # Exchange # RouteKey # Handler
|
168 | 9068cd85 | Georgios Gousios | (QUEUE_GANETI_EVENTS_OP, settings.EXCHANGE_GANETI, DB_HANDLER_KEY_OP, 'update_db'),
|
169 | a17a8e98 | Christos Stavrakakis | (QUEUE_GANETI_EVENTS_NETWORK, settings.EXCHANGE_GANETI, DB_HANDLER_KEY_NETWORK, 'update_network'),
|
170 | 9068cd85 | Georgios Gousios | (QUEUE_GANETI_EVENTS_NET, settings.EXCHANGE_GANETI, DB_HANDLER_KEY_NET, 'update_net'),
|
171 | cf2a3529 | Christos Stavrakakis | (QUEUE_GANETI_BUILD_PROGR, settings.EXCHANGE_GANETI, BUILD_MONITOR_HANDLER, 'update_build_progress'),
|
172 | 698d0666 | Georgios Gousios | ] |
173 | 698d0666 | Georgios Gousios | |
174 | 698d0666 | Georgios Gousios | if settings.DEBUG is True: |
175 | 698d0666 | Georgios Gousios | BINDINGS += [ |
176 | 698d0666 | Georgios Gousios | # Queue # Exchange # RouteKey # Handler
|
177 | 698d0666 | Georgios Gousios | (QUEUE_DEBUG, settings.EXCHANGE_GANETI, '#', 'dummy_proc'), |
178 | 698d0666 | Georgios Gousios | (QUEUE_DEBUG, settings.EXCHANGE_CRON, '#', 'dummy_proc'), |
179 | 698d0666 | Georgios Gousios | (QUEUE_DEBUG, settings.EXCHANGE_API, '#', 'dummy_proc'), |
180 | 698d0666 | Georgios Gousios | ] |
181 | 698d0666 | Georgios Gousios | QUEUES += (QUEUE_DEBUG,) |
182 | 698d0666 | Georgios Gousios | |
183 | 698d0666 | Georgios Gousios | |
184 | c183005e | Georgios Gousios | def _exit_handler(signum, frame): |
185 | d28244af | Vangelis Koukis | """"Catch exit signal in children processes"""
|
186 | 9e98ba3c | Giorgos Verigakis | log.info("Caught signal %d, will raise SystemExit", signum)
|
187 | 8d8ea051 | Georgios Gousios | raise SystemExit |
188 | 8d8ea051 | Georgios Gousios | |
189 | c183005e | Georgios Gousios | |
190 | c183005e | Georgios Gousios | def _parent_handler(signum, frame): |
191 | c183005e | Georgios Gousios | """"Catch exit signal in parent process and forward it to children."""
|
192 | 9e98ba3c | Giorgos Verigakis | global children
|
193 | 9e98ba3c | Giorgos Verigakis | log.info("Caught signal %d, sending SIGTERM to children %s",
|
194 | cf2a3529 | Christos Stavrakakis | signum, children) |
195 | 8861126f | Georgios Gousios | [os.kill(pid, SIGTERM) for pid in children] |
196 | 8861126f | Georgios Gousios | |
197 | c183005e | Georgios Gousios | |
198 | 57d0082a | Georgios Gousios | def child(cmdline): |
199 | c183005e | Georgios Gousios | """The context of the child process"""
|
200 | c183005e | Georgios Gousios | |
201 | c183005e | Georgios Gousios | # Cmd line argument parsing
|
202 | 78e2d194 | Georgios Gousios | (opts, args) = parse_arguments(cmdline) |
203 | d28244af | Vangelis Koukis | disp = Dispatcher(debug=opts.debug) |
204 | 78e2d194 | Georgios Gousios | |
205 | c183005e | Georgios Gousios | # Start the event loop
|
206 | 2cd99e7a | Georgios Gousios | disp.wait() |
207 | 78e2d194 | Georgios Gousios | |
208 | c183005e | Georgios Gousios | |
209 | 78e2d194 | Georgios Gousios | def parse_arguments(args): |
210 | 78e2d194 | Georgios Gousios | from optparse import OptionParser |
211 | 78e2d194 | Georgios Gousios | |
212 | cf2a3529 | Christos Stavrakakis | default_pid_file = os.path.join("var", "run", "synnefo", "dispatcher.pid") |
213 | 78e2d194 | Georgios Gousios | parser = OptionParser() |
214 | c183005e | Georgios Gousios | parser.add_option("-d", "--debug", action="store_true", default=False, |
215 | 2cd99e7a | Georgios Gousios | dest="debug", help="Enable debug mode") |
216 | e6209aa2 | Georgios Gousios | parser.add_option("-w", "--workers", default=2, dest="workers", |
217 | e6209aa2 | Georgios Gousios | help="Number of workers to spawn", type="int") |
218 | 3d975c75 | Kostas Papadimitriou | parser.add_option("-p", "--pid-file", dest="pid_file", |
219 | 3d975c75 | Kostas Papadimitriou | default=default_pid_file, |
220 | 3d975c75 | Kostas Papadimitriou | help="Save PID to file (default: %s)" % default_pid_file)
|
221 | 979482ce | Georgios Gousios | parser.add_option("--purge-queues", action="store_true", |
222 | 979482ce | Georgios Gousios | default=False, dest="purge_queues", |
223 | 57d0082a | Georgios Gousios | help="Remove all declared queues (DANGEROUS!)")
|
224 | 979482ce | Georgios Gousios | parser.add_option("--purge-exchanges", action="store_true", |
225 | 979482ce | Georgios Gousios | default=False, dest="purge_exchanges", |
226 | 979482ce | Georgios Gousios | help="Remove all exchanges. Implies deleting all queues \
|
227 | 979482ce | Georgios Gousios | first (DANGEROUS!)")
|
228 | d5470cdd | Georgios Gousios | parser.add_option("--drain-queue", dest="drain_queue", |
229 | e6209aa2 | Georgios Gousios | help="Strips a queue from all outstanding messages")
|
230 | de081774 | Georgios Gousios | |
231 | 78e2d194 | Georgios Gousios | return parser.parse_args(args)
|
232 | 78e2d194 | Georgios Gousios | |
233 | f30730c0 | Georgios Gousios | |
234 | d28244af | Vangelis Koukis | def purge_queues(): |
235 | 979482ce | Georgios Gousios | """
|
236 | 979482ce | Georgios Gousios | Delete declared queues from RabbitMQ. Use with care!
|
237 | 979482ce | Georgios Gousios | """
|
238 | 698d0666 | Georgios Gousios | global QUEUES, BINDINGS
|
239 | c4e55622 | Christos Stavrakakis | client = AMQPClient() |
240 | c4e55622 | Christos Stavrakakis | client.connect() |
241 | f30730c0 | Georgios Gousios | |
242 | 698d0666 | Georgios Gousios | print "Queues to be deleted: ", QUEUES |
243 | f30730c0 | Georgios Gousios | |
244 | 979482ce | Georgios Gousios | if not get_user_confirmation(): |
245 | f30730c0 | Georgios Gousios | return
|
246 | f30730c0 | Georgios Gousios | |
247 | 698d0666 | Georgios Gousios | for queue in QUEUES: |
248 | c4e55622 | Christos Stavrakakis | result = client.queue_delete(queue=queue) |
249 | c4e55622 | Christos Stavrakakis | print "Deleting queue %s. Result: %s" % (queue, result) |
250 | 979482ce | Georgios Gousios | |
251 | c4e55622 | Christos Stavrakakis | client.close() |
252 | 979482ce | Georgios Gousios | |
253 | 979482ce | Georgios Gousios | |
254 | 979482ce | Georgios Gousios | def purge_exchanges(): |
255 | e6f5bb10 | Vangelis Koukis | """Delete declared exchanges from RabbitMQ, after removing all queues"""
|
256 | 698d0666 | Georgios Gousios | global QUEUES, BINDINGS
|
257 | 979482ce | Georgios Gousios | purge_queues() |
258 | 979482ce | Georgios Gousios | |
259 | c4e55622 | Christos Stavrakakis | client = AMQPClient() |
260 | c4e55622 | Christos Stavrakakis | client.connect() |
261 | 979482ce | Georgios Gousios | |
262 | e6f5bb10 | Vangelis Koukis | print "Exchanges to be deleted: ", settings.EXCHANGES |
263 | 979482ce | Georgios Gousios | |
264 | 979482ce | Georgios Gousios | if not get_user_confirmation(): |
265 | 979482ce | Georgios Gousios | return
|
266 | 979482ce | Georgios Gousios | |
267 | 979482ce | Georgios Gousios | for exchange in settings.EXCHANGES: |
268 | c4e55622 | Christos Stavrakakis | result = client.exchange_delete(exchange=exchange) |
269 | c4e55622 | Christos Stavrakakis | print "Deleting exchange %s. Result: %s" % (exchange, result) |
270 | 979482ce | Georgios Gousios | |
271 | c4e55622 | Christos Stavrakakis | client.close() |
272 | f30730c0 | Georgios Gousios | |
273 | c183005e | Georgios Gousios | |
274 | 979482ce | Georgios Gousios | def drain_queue(queue): |
275 | e6f5bb10 | Vangelis Koukis | """Strip a (declared) queue from all outstanding messages"""
|
276 | 698d0666 | Georgios Gousios | global QUEUES, BINDINGS
|
277 | 979482ce | Georgios Gousios | if not queue: |
278 | 979482ce | Georgios Gousios | return
|
279 | 979482ce | Georgios Gousios | |
280 | 698d0666 | Georgios Gousios | if not queue in QUEUES: |
281 | 979482ce | Georgios Gousios | print "Queue %s not configured" % queue |
282 | 979482ce | Georgios Gousios | return
|
283 | 979482ce | Georgios Gousios | |
284 | 979482ce | Georgios Gousios | print "Queue to be drained: %s" % queue |
285 | 979482ce | Georgios Gousios | |
286 | 979482ce | Georgios Gousios | if not get_user_confirmation(): |
287 | 979482ce | Georgios Gousios | return
|
288 | c4e55622 | Christos Stavrakakis | |
289 | c4e55622 | Christos Stavrakakis | client = AMQPClient() |
290 | c4e55622 | Christos Stavrakakis | client.connect() |
291 | 979482ce | Georgios Gousios | |
292 | e6209aa2 | Georgios Gousios | # Register a temporary queue binding
|
293 | 698d0666 | Georgios Gousios | for binding in BINDINGS: |
294 | e6209aa2 | Georgios Gousios | if binding[0] == queue: |
295 | e6209aa2 | Georgios Gousios | exch = binding[1]
|
296 | e6209aa2 | Georgios Gousios | |
297 | c4e55622 | Christos Stavrakakis | tag = client.basic_consume(queue=queue, callback=callbacks.dummy_proc) |
298 | e6209aa2 | Georgios Gousios | |
299 | e6209aa2 | Georgios Gousios | print "Queue draining about to start, hit Ctrl+c when done" |
300 | e6209aa2 | Georgios Gousios | time.sleep(2)
|
301 | e6209aa2 | Georgios Gousios | print "Queue draining starting" |
302 | e6209aa2 | Georgios Gousios | |
303 | e6209aa2 | Georgios Gousios | signal(SIGTERM, _exit_handler) |
304 | e6209aa2 | Georgios Gousios | signal(SIGINT, _exit_handler) |
305 | e6209aa2 | Georgios Gousios | |
306 | c626e1d0 | Georgios Gousios | num_processed = 0
|
307 | e6209aa2 | Georgios Gousios | while True: |
308 | c4e55622 | Christos Stavrakakis | client.basic_wait() |
309 | c626e1d0 | Georgios Gousios | num_processed += 1
|
310 | c626e1d0 | Georgios Gousios | sys.stderr.write("Ignored %d messages\r" % num_processed)
|
311 | c626e1d0 | Georgios Gousios | |
312 | c4e55622 | Christos Stavrakakis | client.basic_cancel(tag) |
313 | c4e55622 | Christos Stavrakakis | client.close() |
314 | 95aee02c | Vangelis Koukis | |
315 | 979482ce | Georgios Gousios | |
316 | 95aee02c | Vangelis Koukis | |
317 | 979482ce | Georgios Gousios | def get_user_confirmation(): |
318 | 979482ce | Georgios Gousios | ans = raw_input("Are you sure (N/y):") |
319 | 979482ce | Georgios Gousios | |
320 | 979482ce | Georgios Gousios | if not ans: |
321 | 979482ce | Georgios Gousios | return False |
322 | 979482ce | Georgios Gousios | if ans not in ['Y', 'y']: |
323 | 979482ce | Georgios Gousios | return False |
324 | 979482ce | Georgios Gousios | return True |
325 | 979482ce | Georgios Gousios | |
326 | 979482ce | Georgios Gousios | |
327 | 57d0082a | Georgios Gousios | def debug_mode(): |
328 | 2bf8d695 | Vangelis Koukis | disp = Dispatcher(debug=True)
|
329 | 838239fa | Georgios Gousios | signal(SIGINT, _exit_handler) |
330 | 838239fa | Georgios Gousios | signal(SIGTERM, _exit_handler) |
331 | 838239fa | Georgios Gousios | |
332 | 838239fa | Georgios Gousios | disp.wait() |
333 | 838239fa | Georgios Gousios | |
334 | 838239fa | Georgios Gousios | |
335 | d28244af | Vangelis Koukis | def daemon_mode(opts): |
336 | 9e98ba3c | Giorgos Verigakis | global children
|
337 | 698d0666 | Georgios Gousios | |
338 | d28244af | Vangelis Koukis | # Create pidfile,
|
339 | cf2a3529 | Christos Stavrakakis | pidf = pidlockfile.TimeoutPIDLockFile(opts.pid_file, 10)
|
340 | 4ed2e471 | Georgios Gousios | |
341 | cf2a3529 | Christos Stavrakakis | if daemon.runner.is_pidfile_stale(pidf):
|
342 | cf2a3529 | Christos Stavrakakis | log.warning("Removing stale PID lock file %s", pidf.path)
|
343 | cf2a3529 | Christos Stavrakakis | pidf.break_lock() |
344 | cf2a3529 | Christos Stavrakakis | |
345 | cf2a3529 | Christos Stavrakakis | try:
|
346 | cf2a3529 | Christos Stavrakakis | pidf.acquire() |
347 | cf2a3529 | Christos Stavrakakis | except (pidlockfile.AlreadyLocked, LockTimeout):
|
348 | cf2a3529 | Christos Stavrakakis | log.critical("Failed to lock pidfile %s, another instance running?",
|
349 | cf2a3529 | Christos Stavrakakis | pidf.path) |
350 | cf2a3529 | Christos Stavrakakis | sys.exit(1)
|
351 | de081774 | Georgios Gousios | |
352 | 9e98ba3c | Giorgos Verigakis | log.info("Became a daemon")
|
353 | 57d0082a | Georgios Gousios | |
354 | c183005e | Georgios Gousios | # Fork workers
|
355 | 8861126f | Georgios Gousios | children = [] |
356 | 8861126f | Georgios Gousios | |
357 | 8861126f | Georgios Gousios | i = 0
|
358 | 8861126f | Georgios Gousios | while i < opts.workers:
|
359 | 8861126f | Georgios Gousios | newpid = os.fork() |
360 | 8861126f | Georgios Gousios | |
361 | 8861126f | Georgios Gousios | if newpid == 0: |
362 | d28244af | Vangelis Koukis | signal(SIGINT, _exit_handler) |
363 | c183005e | Georgios Gousios | signal(SIGTERM, _exit_handler) |
364 | 57d0082a | Georgios Gousios | child(sys.argv[1:])
|
365 | 4dc0b46a | Georgios Gousios | sys.exit(1)
|
366 | 8861126f | Georgios Gousios | else:
|
367 | 9e98ba3c | Giorgos Verigakis | log.debug("%d, forked child: %d", os.getpid(), newpid)
|
368 | 9e98ba3c | Giorgos Verigakis | children.append(newpid) |
369 | 8861126f | Georgios Gousios | i += 1
|
370 | 8861126f | Georgios Gousios | |
371 | 8d8ea051 | Georgios Gousios | # Catch signals to ensure graceful shutdown
|
372 | d28244af | Vangelis Koukis | signal(SIGINT, _parent_handler) |
373 | c183005e | Georgios Gousios | signal(SIGTERM, _parent_handler) |
374 | 8861126f | Georgios Gousios | |
375 | 57d0082a | Georgios Gousios | # Wait for all children processes to die, one by one
|
376 | d28244af | Vangelis Koukis | try:
|
377 | de081774 | Georgios Gousios | for pid in children: |
378 | de081774 | Georgios Gousios | try:
|
379 | de081774 | Georgios Gousios | os.waitpid(pid, 0)
|
380 | de081774 | Georgios Gousios | except Exception: |
381 | de081774 | Georgios Gousios | pass
|
382 | de081774 | Georgios Gousios | finally:
|
383 | de081774 | Georgios Gousios | pidf.release() |
384 | c183005e | Georgios Gousios | |
385 | d28244af | Vangelis Koukis | |
386 | d28244af | Vangelis Koukis | def main(): |
387 | cf2a3529 | Christos Stavrakakis | (opts, args) = parse_arguments(sys.argv[1:])
|
388 | cf2a3529 | Christos Stavrakakis | |
389 | 3d975c75 | Kostas Papadimitriou | dictConfig(settings.DISPATCHER_LOGGING) |
390 | 3d975c75 | Kostas Papadimitriou | |
391 | 9e98ba3c | Giorgos Verigakis | global log
|
392 | 3d975c75 | Kostas Papadimitriou | |
393 | d28244af | Vangelis Koukis | # Init the global variables containing the queues
|
394 | d28244af | Vangelis Koukis | _init_queues() |
395 | d28244af | Vangelis Koukis | |
396 | d28244af | Vangelis Koukis | # Special case for the clean up queues action
|
397 | d28244af | Vangelis Koukis | if opts.purge_queues:
|
398 | d28244af | Vangelis Koukis | purge_queues() |
399 | d28244af | Vangelis Koukis | return
|
400 | d28244af | Vangelis Koukis | |
401 | d28244af | Vangelis Koukis | # Special case for the clean up exch action
|
402 | d28244af | Vangelis Koukis | if opts.purge_exchanges:
|
403 | d28244af | Vangelis Koukis | purge_exchanges() |
404 | d28244af | Vangelis Koukis | return
|
405 | d28244af | Vangelis Koukis | |
406 | d28244af | Vangelis Koukis | if opts.drain_queue:
|
407 | d28244af | Vangelis Koukis | drain_queue(opts.drain_queue) |
408 | d28244af | Vangelis Koukis | return
|
409 | d28244af | Vangelis Koukis | |
410 | d28244af | Vangelis Koukis | # Debug mode, process messages without spawning workers
|
411 | d28244af | Vangelis Koukis | if opts.debug:
|
412 | d28244af | Vangelis Koukis | debug_mode() |
413 | d28244af | Vangelis Koukis | return
|
414 | 7c62bd54 | Kostas Papadimitriou | |
415 | 9e98ba3c | Giorgos Verigakis | files_preserve = [] |
416 | 9e98ba3c | Giorgos Verigakis | for handler in log.handlers: |
417 | 9e98ba3c | Giorgos Verigakis | stream = getattr(handler, 'stream') |
418 | 9e98ba3c | Giorgos Verigakis | if stream and hasattr(stream, 'fileno'): |
419 | 9e98ba3c | Giorgos Verigakis | files_preserve.append(handler.stream) |
420 | 7c62bd54 | Kostas Papadimitriou | |
421 | cf2a3529 | Christos Stavrakakis | daemon_context = daemon.daemon.DaemonContext( |
422 | d28244af | Vangelis Koukis | files_preserve=files_preserve, |
423 | d28244af | Vangelis Koukis | umask=022)
|
424 | 7c62bd54 | Kostas Papadimitriou | |
425 | d28244af | Vangelis Koukis | daemon_context.open() |
426 | d28244af | Vangelis Koukis | |
427 | d28244af | Vangelis Koukis | # Catch every exception, make sure it gets logged properly
|
428 | d28244af | Vangelis Koukis | try:
|
429 | d28244af | Vangelis Koukis | daemon_mode(opts) |
430 | d28244af | Vangelis Koukis | except Exception: |
431 | 9e98ba3c | Giorgos Verigakis | log.exception("Unknown error")
|
432 | d28244af | Vangelis Koukis | raise
|
433 | d28244af | Vangelis Koukis | |
434 | d28244af | Vangelis Koukis | |
435 | 7c62bd54 | Kostas Papadimitriou | if __name__ == "__main__": |
436 | 3d975c75 | Kostas Papadimitriou | sys.exit(main()) |
437 | 7c62bd54 | Kostas Papadimitriou | |
438 | 8d8ea051 | Georgios Gousios | # vim: set sta sts=4 shiftwidth=4 sw=4 et ai : |