root / snf-cyclades-app / synnefo / logic / dispatcher.py @ 1fb3293d
History | View | Annotate | Download (11.9 kB)
1 | d08a5f6f | Vangelis Koukis | #!/usr/bin/env python
|
---|---|---|---|
2 | 48130e66 | Georgios Gousios | # Copyright 2011 GRNET S.A. All rights reserved.
|
3 | 87ace70f | Vassilios Karakoidas | #
|
4 | 48130e66 | Georgios Gousios | # Redistribution and use in source and binary forms, with or without
|
5 | 48130e66 | Georgios Gousios | # modification, are permitted provided that the following conditions
|
6 | 48130e66 | Georgios Gousios | # are met:
|
7 | 7bd50624 | Vassilios Karakoidas | #
|
8 | 48130e66 | Georgios Gousios | # 1. Redistributions of source code must retain the above copyright
|
9 | 48130e66 | Georgios Gousios | # notice, this list of conditions and the following disclaimer.
|
10 | 48130e66 | Georgios Gousios | #
|
11 | 48130e66 | Georgios Gousios | # 2. Redistributions in binary form must reproduce the above copyright
|
12 | 48130e66 | Georgios Gousios | # notice, this list of conditions and the following disclaimer in the
|
13 | 48130e66 | Georgios Gousios | # documentation and/or other materials provided with the distribution.
|
14 | 48130e66 | Georgios Gousios | #
|
15 | 48130e66 | Georgios Gousios | # THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
|
16 | 48130e66 | Georgios Gousios | # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
|
17 | 48130e66 | Georgios Gousios | # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
|
18 | 48130e66 | Georgios Gousios | # ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
|
19 | 48130e66 | Georgios Gousios | # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
|
20 | 48130e66 | Georgios Gousios | # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
|
21 | 48130e66 | Georgios Gousios | # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
|
22 | 48130e66 | Georgios Gousios | # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
|
23 | 48130e66 | Georgios Gousios | # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
|
24 | 48130e66 | Georgios Gousios | # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
|
25 | 48130e66 | Georgios Gousios | # SUCH DAMAGE.
|
26 | 48130e66 | Georgios Gousios | #
|
27 | 48130e66 | Georgios Gousios | # The views and conclusions contained in the software and documentation are
|
28 | 48130e66 | Georgios Gousios | # those of the authors and should not be interpreted as representing official
|
29 | 48130e66 | Georgios Gousios | # policies, either expressed or implied, of GRNET S.A.
|
30 | 48130e66 | Georgios Gousios | |
31 | 48130e66 | Georgios Gousios | |
32 | e6209aa2 | Georgios Gousios | """ Message queue setup, dispatch and admin
|
33 | c183005e | Georgios Gousios |
|
34 | 2cd99e7a | Georgios Gousios | This program sets up connections to the queues configured in settings.py
|
35 | 2cd99e7a | Georgios Gousios | and implements the message wait and dispatch loops. Actual messages are
|
36 | 2cd99e7a | Georgios Gousios | handled in the dispatched functions.
|
37 | fcbc5bb3 | Vassilios Karakoidas |
|
38 | d08a5f6f | Vangelis Koukis | """
|
39 | c99fe4c7 | Vassilios Karakoidas | |
40 | cf2a3529 | Christos Stavrakakis | # Fix path to import synnefo settings
|
41 | d08a5f6f | Vangelis Koukis | import sys |
42 | 86221fd5 | Panos Louridas | import os |
43 | 86221fd5 | Panos Louridas | path = os.path.normpath(os.path.join(os.getcwd(), '..'))
|
44 | 86221fd5 | Panos Louridas | sys.path.append(path) |
45 | 841c26cc | Georgios D. Tsoukalas | |
46 | 841c26cc | Georgios D. Tsoukalas | os.environ['DJANGO_SETTINGS_MODULE'] = 'synnefo.settings' |
47 | 0c09b1c0 | Christos Stavrakakis | from django.conf import settings |
48 | d08a5f6f | Vangelis Koukis | |
49 | 5da13d77 | Vangelis Koukis | from django.db import close_connection |
50 | 5da13d77 | Vangelis Koukis | |
51 | 8d8ea051 | Georgios Gousios | import time |
52 | 4ed2e471 | Georgios Gousios | |
53 | 4936f2e2 | Christos Stavrakakis | import daemon |
54 | cf2a3529 | Christos Stavrakakis | import daemon.runner |
55 | cf2a3529 | Christos Stavrakakis | from lockfile import LockTimeout |
56 | 4ed2e471 | Georgios Gousios | # Take care of differences between python-daemon versions.
|
57 | 4ed2e471 | Georgios Gousios | try:
|
58 | cf2a3529 | Christos Stavrakakis | from daemon import pidfile as pidlockfile |
59 | 4ed2e471 | Georgios Gousios | except:
|
60 | 4ed2e471 | Georgios Gousios | from daemon import pidlockfile |
61 | 1dc821c9 | Christos Stavrakakis | import setproctitle |
62 | 8d8ea051 | Georgios Gousios | |
63 | cf2a3529 | Christos Stavrakakis | from synnefo.lib.amqp import AMQPClient |
64 | 9cb903f9 | Vangelis Koukis | from synnefo.logic import callbacks |
65 | 659de616 | Christos Stavrakakis | from synnefo.logic import queues |
66 | 9e98ba3c | Giorgos Verigakis | |
67 | d01cd522 | Christos Stavrakakis | import logging |
68 | fdfd8c6d | Christos Stavrakakis | |
69 | fdfd8c6d | Christos Stavrakakis | log = logging.getLogger("dispatcher")
|
70 | fdfd8c6d | Christos Stavrakakis | log_amqp = logging.getLogger("amqp")
|
71 | fdfd8c6d | Christos Stavrakakis | log_logic = logging.getLogger("synnefo.logic")
|
72 | fdfd8c6d | Christos Stavrakakis | |
73 | fdfd8c6d | Christos Stavrakakis | LOGGERS = [log, log_amqp, log_logic] |
74 | 9e98ba3c | Giorgos Verigakis | |
75 | 2bf8d695 | Vangelis Koukis | |
76 | 78e2d194 | Georgios Gousios | class Dispatcher: |
77 | 5d081749 | Georgios Gousios | debug = False
|
78 | 78e2d194 | Georgios Gousios | |
79 | 2bf8d695 | Vangelis Koukis | def __init__(self, debug=False): |
80 | 5d081749 | Georgios Gousios | self.debug = debug
|
81 | 5d081749 | Georgios Gousios | self._init()
|
82 | da102335 | Georgios Gousios | |
83 | 78e2d194 | Georgios Gousios | def wait(self): |
84 | c4e55622 | Christos Stavrakakis | log.info("Waiting for messages..")
|
85 | b1bb9251 | Christos Stavrakakis | timeout = 600
|
86 | 78e2d194 | Georgios Gousios | while True: |
87 | 78e2d194 | Georgios Gousios | try:
|
88 | 5da13d77 | Vangelis Koukis | # Close the Django DB connection before processing
|
89 | 5da13d77 | Vangelis Koukis | # every incoming message. This plays nicely with
|
90 | 5da13d77 | Vangelis Koukis | # DB connection pooling, if enabled and allows
|
91 | 5da13d77 | Vangelis Koukis | # the dispatcher to recover from broken connections
|
92 | 5da13d77 | Vangelis Koukis | # gracefully.
|
93 | 5da13d77 | Vangelis Koukis | close_connection() |
94 | b1bb9251 | Christos Stavrakakis | msg = self.client.basic_wait(timeout=timeout)
|
95 | b1bb9251 | Christos Stavrakakis | if not msg: |
96 | b1bb9251 | Christos Stavrakakis | log.warning("Idle connection for %d seconds. Will connect"
|
97 | b1bb9251 | Christos Stavrakakis | " to a different host. Verify that"
|
98 | b1bb9251 | Christos Stavrakakis | " snf-ganeti-eventd is running!!", timeout)
|
99 | b1bb9251 | Christos Stavrakakis | self.client.reconnect()
|
100 | 78e2d194 | Georgios Gousios | except SystemExit: |
101 | 78e2d194 | Georgios Gousios | break
|
102 | c4e55622 | Christos Stavrakakis | except Exception as e: |
103 | c4e55622 | Christos Stavrakakis | log.exception("Caught unexpected exception: %s", e)
|
104 | c4e55622 | Christos Stavrakakis | |
105 | c4e55622 | Christos Stavrakakis | self.client.basic_cancel()
|
106 | c4e55622 | Christos Stavrakakis | self.client.close()
|
107 | 78e2d194 | Georgios Gousios | |
108 | 5d081749 | Georgios Gousios | def _init(self): |
109 | 9e98ba3c | Giorgos Verigakis | log.info("Initializing")
|
110 | 226f086a | Georgios Gousios | |
111 | a8858945 | Christos Stavrakakis | self.client = AMQPClient(logger=log_amqp)
|
112 | c4e55622 | Christos Stavrakakis | # Connect to AMQP host
|
113 | c4e55622 | Christos Stavrakakis | self.client.connect()
|
114 | 78e2d194 | Georgios Gousios | |
115 | c183005e | Georgios Gousios | # Declare queues and exchanges
|
116 | 659de616 | Christos Stavrakakis | exchange = settings.EXCHANGE_GANETI |
117 | 659de616 | Christos Stavrakakis | exchange_dl = queues.convert_exchange_to_dead(exchange) |
118 | 659de616 | Christos Stavrakakis | self.client.exchange_declare(exchange=exchange,
|
119 | 659de616 | Christos Stavrakakis | type="topic")
|
120 | 659de616 | Christos Stavrakakis | self.client.exchange_declare(exchange=exchange_dl,
|
121 | 659de616 | Christos Stavrakakis | type="topic")
|
122 | 659de616 | Christos Stavrakakis | |
123 | 659de616 | Christos Stavrakakis | for queue in queues.QUEUES: |
124 | c4e55622 | Christos Stavrakakis | # Queues are mirrored to all RabbitMQ brokers
|
125 | 659de616 | Christos Stavrakakis | self.client.queue_declare(queue=queue, mirrored=True, |
126 | 659de616 | Christos Stavrakakis | dead_letter_exchange=exchange_dl) |
127 | 659de616 | Christos Stavrakakis | # Declare the corresponding dead-letter queue
|
128 | 659de616 | Christos Stavrakakis | queue_dl = queues.convert_queue_to_dead(queue) |
129 | 659de616 | Christos Stavrakakis | self.client.queue_declare(queue=queue_dl, mirrored=True) |
130 | 78e2d194 | Georgios Gousios | |
131 | c183005e | Georgios Gousios | # Bind queues to handler methods
|
132 | 659de616 | Christos Stavrakakis | for binding in queues.BINDINGS: |
133 | 78e2d194 | Georgios Gousios | try:
|
134 | 9cb903f9 | Vangelis Koukis | callback = getattr(callbacks, binding[3]) |
135 | 23c84263 | Georgios Gousios | except AttributeError: |
136 | 9e98ba3c | Giorgos Verigakis | log.error("Cannot find callback %s", binding[3]) |
137 | e6f5bb10 | Vangelis Koukis | raise SystemExit(1) |
138 | 659de616 | Christos Stavrakakis | queue = binding[0]
|
139 | 659de616 | Christos Stavrakakis | exchange = binding[1]
|
140 | 659de616 | Christos Stavrakakis | routing_key = binding[2]
|
141 | 8d8ea051 | Georgios Gousios | |
142 | 659de616 | Christos Stavrakakis | self.client.queue_bind(queue=queue, exchange=exchange,
|
143 | 659de616 | Christos Stavrakakis | routing_key=routing_key) |
144 | c4e55622 | Christos Stavrakakis | |
145 | 370f69ec | Christos Stavrakakis | self.client.basic_consume(queue=binding[0], |
146 | 6d27eadd | Christos Stavrakakis | callback=callback, |
147 | 6d27eadd | Christos Stavrakakis | prefetch_count=5)
|
148 | c4e55622 | Christos Stavrakakis | |
149 | 659de616 | Christos Stavrakakis | queue_dl = queues.convert_queue_to_dead(queue) |
150 | 659de616 | Christos Stavrakakis | exchange_dl = queues.convert_exchange_to_dead(exchange) |
151 | 659de616 | Christos Stavrakakis | # Bind the corresponding dead-letter queue
|
152 | 659de616 | Christos Stavrakakis | self.client.queue_bind(queue=queue_dl,
|
153 | 659de616 | Christos Stavrakakis | exchange=exchange_dl, |
154 | 659de616 | Christos Stavrakakis | routing_key=routing_key) |
155 | 659de616 | Christos Stavrakakis | |
156 | 9e98ba3c | Giorgos Verigakis | log.debug("Binding %s(%s) to queue %s with handler %s",
|
157 | 659de616 | Christos Stavrakakis | exchange, routing_key, queue, binding[3])
|
158 | 698d0666 | Georgios Gousios | |
159 | 698d0666 | Georgios Gousios | |
160 | 78e2d194 | Georgios Gousios | def parse_arguments(args): |
161 | 78e2d194 | Georgios Gousios | from optparse import OptionParser |
162 | 78e2d194 | Georgios Gousios | |
163 | 4936f2e2 | Christos Stavrakakis | default_pid_file = \ |
164 | 4936f2e2 | Christos Stavrakakis | os.path.join(".", "var", "run", "synnefo", "dispatcher.pid")[1:] |
165 | 78e2d194 | Georgios Gousios | parser = OptionParser() |
166 | c183005e | Georgios Gousios | parser.add_option("-d", "--debug", action="store_true", default=False, |
167 | 2cd99e7a | Georgios Gousios | dest="debug", help="Enable debug mode") |
168 | e6209aa2 | Georgios Gousios | parser.add_option("-w", "--workers", default=2, dest="workers", |
169 | e6209aa2 | Georgios Gousios | help="Number of workers to spawn", type="int") |
170 | 3d975c75 | Kostas Papadimitriou | parser.add_option("-p", "--pid-file", dest="pid_file", |
171 | 3d975c75 | Kostas Papadimitriou | default=default_pid_file, |
172 | 3d975c75 | Kostas Papadimitriou | help="Save PID to file (default: %s)" % default_pid_file)
|
173 | 979482ce | Georgios Gousios | parser.add_option("--purge-queues", action="store_true", |
174 | 979482ce | Georgios Gousios | default=False, dest="purge_queues", |
175 | 57d0082a | Georgios Gousios | help="Remove all declared queues (DANGEROUS!)")
|
176 | 979482ce | Georgios Gousios | parser.add_option("--purge-exchanges", action="store_true", |
177 | 979482ce | Georgios Gousios | default=False, dest="purge_exchanges", |
178 | 979482ce | Georgios Gousios | help="Remove all exchanges. Implies deleting all queues \
|
179 | 979482ce | Georgios Gousios | first (DANGEROUS!)")
|
180 | d5470cdd | Georgios Gousios | parser.add_option("--drain-queue", dest="drain_queue", |
181 | e6209aa2 | Georgios Gousios | help="Strips a queue from all outstanding messages")
|
182 | de081774 | Georgios Gousios | |
183 | 78e2d194 | Georgios Gousios | return parser.parse_args(args)
|
184 | 78e2d194 | Georgios Gousios | |
185 | f30730c0 | Georgios Gousios | |
186 | d28244af | Vangelis Koukis | def purge_queues(): |
187 | 979482ce | Georgios Gousios | """
|
188 | 979482ce | Georgios Gousios | Delete declared queues from RabbitMQ. Use with care!
|
189 | 979482ce | Georgios Gousios | """
|
190 | 2ef10562 | Christos Stavrakakis | client = AMQPClient(max_retries=120)
|
191 | c4e55622 | Christos Stavrakakis | client.connect() |
192 | f30730c0 | Georgios Gousios | |
193 | 659de616 | Christos Stavrakakis | print "Queues to be deleted: ", queues.QUEUES |
194 | f30730c0 | Georgios Gousios | |
195 | 979482ce | Georgios Gousios | if not get_user_confirmation(): |
196 | f30730c0 | Georgios Gousios | return
|
197 | f30730c0 | Georgios Gousios | |
198 | 659de616 | Christos Stavrakakis | for queue in queues.QUEUES: |
199 | c4e55622 | Christos Stavrakakis | result = client.queue_delete(queue=queue) |
200 | c4e55622 | Christos Stavrakakis | print "Deleting queue %s. Result: %s" % (queue, result) |
201 | 979482ce | Georgios Gousios | |
202 | c4e55622 | Christos Stavrakakis | client.close() |
203 | 979482ce | Georgios Gousios | |
204 | 979482ce | Georgios Gousios | |
205 | 979482ce | Georgios Gousios | def purge_exchanges(): |
206 | e6f5bb10 | Vangelis Koukis | """Delete declared exchanges from RabbitMQ, after removing all queues"""
|
207 | 979482ce | Georgios Gousios | purge_queues() |
208 | 979482ce | Georgios Gousios | |
209 | c4e55622 | Christos Stavrakakis | client = AMQPClient() |
210 | c4e55622 | Christos Stavrakakis | client.connect() |
211 | 979482ce | Georgios Gousios | |
212 | 659de616 | Christos Stavrakakis | exchanges = queues.EXCHANGES |
213 | 659de616 | Christos Stavrakakis | print "Exchanges to be deleted: ", exchanges |
214 | 979482ce | Georgios Gousios | |
215 | 979482ce | Georgios Gousios | if not get_user_confirmation(): |
216 | 979482ce | Georgios Gousios | return
|
217 | 979482ce | Georgios Gousios | |
218 | 659de616 | Christos Stavrakakis | for exch in exchanges: |
219 | 659de616 | Christos Stavrakakis | result = client.exchange_delete(exchange=exch) |
220 | 659de616 | Christos Stavrakakis | print "Deleting exchange %s. Result: %s" % (exch, result) |
221 | c4e55622 | Christos Stavrakakis | client.close() |
222 | f30730c0 | Georgios Gousios | |
223 | c183005e | Georgios Gousios | |
224 | 979482ce | Georgios Gousios | def drain_queue(queue): |
225 | e6f5bb10 | Vangelis Koukis | """Strip a (declared) queue from all outstanding messages"""
|
226 | 979482ce | Georgios Gousios | if not queue: |
227 | 979482ce | Georgios Gousios | return
|
228 | 979482ce | Georgios Gousios | |
229 | 659de616 | Christos Stavrakakis | if not queue in queues.QUEUES: |
230 | 979482ce | Georgios Gousios | print "Queue %s not configured" % queue |
231 | 979482ce | Georgios Gousios | return
|
232 | 979482ce | Georgios Gousios | |
233 | 979482ce | Georgios Gousios | print "Queue to be drained: %s" % queue |
234 | 979482ce | Georgios Gousios | |
235 | 979482ce | Georgios Gousios | if not get_user_confirmation(): |
236 | 979482ce | Georgios Gousios | return
|
237 | c4e55622 | Christos Stavrakakis | |
238 | c4e55622 | Christos Stavrakakis | client = AMQPClient() |
239 | c4e55622 | Christos Stavrakakis | client.connect() |
240 | 979482ce | Georgios Gousios | |
241 | c4e55622 | Christos Stavrakakis | tag = client.basic_consume(queue=queue, callback=callbacks.dummy_proc) |
242 | e6209aa2 | Georgios Gousios | |
243 | e6209aa2 | Georgios Gousios | print "Queue draining about to start, hit Ctrl+c when done" |
244 | e6209aa2 | Georgios Gousios | time.sleep(2)
|
245 | e6209aa2 | Georgios Gousios | print "Queue draining starting" |
246 | e6209aa2 | Georgios Gousios | |
247 | c626e1d0 | Georgios Gousios | num_processed = 0
|
248 | e6209aa2 | Georgios Gousios | while True: |
249 | c4e55622 | Christos Stavrakakis | client.basic_wait() |
250 | c626e1d0 | Georgios Gousios | num_processed += 1
|
251 | c626e1d0 | Georgios Gousios | sys.stderr.write("Ignored %d messages\r" % num_processed)
|
252 | c626e1d0 | Georgios Gousios | |
253 | c4e55622 | Christos Stavrakakis | client.basic_cancel(tag) |
254 | c4e55622 | Christos Stavrakakis | client.close() |
255 | 95aee02c | Vangelis Koukis | |
256 | 979482ce | Georgios Gousios | |
257 | 979482ce | Georgios Gousios | def get_user_confirmation(): |
258 | 979482ce | Georgios Gousios | ans = raw_input("Are you sure (N/y):") |
259 | 979482ce | Georgios Gousios | |
260 | 979482ce | Georgios Gousios | if not ans: |
261 | 979482ce | Georgios Gousios | return False |
262 | 979482ce | Georgios Gousios | if ans not in ['Y', 'y']: |
263 | 979482ce | Georgios Gousios | return False |
264 | 979482ce | Georgios Gousios | return True |
265 | 979482ce | Georgios Gousios | |
266 | 979482ce | Georgios Gousios | |
267 | 57d0082a | Georgios Gousios | def debug_mode(): |
268 | 2bf8d695 | Vangelis Koukis | disp = Dispatcher(debug=True)
|
269 | 838239fa | Georgios Gousios | disp.wait() |
270 | 838239fa | Georgios Gousios | |
271 | 838239fa | Georgios Gousios | |
272 | d28244af | Vangelis Koukis | def daemon_mode(opts): |
273 | 4936f2e2 | Christos Stavrakakis | disp = Dispatcher(debug=False)
|
274 | 4936f2e2 | Christos Stavrakakis | disp.wait() |
275 | c183005e | Georgios Gousios | |
276 | d28244af | Vangelis Koukis | |
277 | d01cd522 | Christos Stavrakakis | def setup_logging(opts): |
278 | d01cd522 | Christos Stavrakakis | import logging |
279 | cc92b70f | Christos Stavrakakis | formatter = logging.Formatter("%(asctime)s %(name)s %(module)s"
|
280 | cc92b70f | Christos Stavrakakis | " [%(levelname)s] %(message)s")
|
281 | d01cd522 | Christos Stavrakakis | if opts.debug:
|
282 | fdfd8c6d | Christos Stavrakakis | log_handler = logging.StreamHandler() |
283 | fdfd8c6d | Christos Stavrakakis | log_handler.setFormatter(formatter) |
284 | d01cd522 | Christos Stavrakakis | else:
|
285 | d01cd522 | Christos Stavrakakis | import logging.handlers |
286 | d01cd522 | Christos Stavrakakis | log_file = "/var/log/synnefo/dispatcher.log"
|
287 | fdfd8c6d | Christos Stavrakakis | log_handler = logging.handlers.WatchedFileHandler(log_file) |
288 | fdfd8c6d | Christos Stavrakakis | log_handler.setFormatter(formatter) |
289 | d01cd522 | Christos Stavrakakis | |
290 | fdfd8c6d | Christos Stavrakakis | for l in LOGGERS: |
291 | fdfd8c6d | Christos Stavrakakis | l.addHandler(log_handler) |
292 | fdfd8c6d | Christos Stavrakakis | l.setLevel(logging.DEBUG) |
293 | d01cd522 | Christos Stavrakakis | |
294 | d01cd522 | Christos Stavrakakis | |
295 | d28244af | Vangelis Koukis | def main(): |
296 | cf2a3529 | Christos Stavrakakis | (opts, args) = parse_arguments(sys.argv[1:])
|
297 | cf2a3529 | Christos Stavrakakis | |
298 | 1dc821c9 | Christos Stavrakakis | # Rename this process so 'ps' output looks like this is a native
|
299 | 1dc821c9 | Christos Stavrakakis | # executable. Can not seperate command-line arguments from actual name of
|
300 | 1dc821c9 | Christos Stavrakakis | # the executable by NUL bytes, so only show the name of the executable
|
301 | 1dc821c9 | Christos Stavrakakis | # instead. setproctitle.setproctitle("\x00".join(sys.argv))
|
302 | 1dc821c9 | Christos Stavrakakis | setproctitle.setproctitle(sys.argv[0])
|
303 | d01cd522 | Christos Stavrakakis | setup_logging(opts) |
304 | 8ec69269 | Christos Stavrakakis | |
305 | d28244af | Vangelis Koukis | # Special case for the clean up queues action
|
306 | d28244af | Vangelis Koukis | if opts.purge_queues:
|
307 | d28244af | Vangelis Koukis | purge_queues() |
308 | d28244af | Vangelis Koukis | return
|
309 | d28244af | Vangelis Koukis | |
310 | d28244af | Vangelis Koukis | # Special case for the clean up exch action
|
311 | d28244af | Vangelis Koukis | if opts.purge_exchanges:
|
312 | d28244af | Vangelis Koukis | purge_exchanges() |
313 | d28244af | Vangelis Koukis | return
|
314 | d28244af | Vangelis Koukis | |
315 | d28244af | Vangelis Koukis | if opts.drain_queue:
|
316 | d28244af | Vangelis Koukis | drain_queue(opts.drain_queue) |
317 | d28244af | Vangelis Koukis | return
|
318 | d28244af | Vangelis Koukis | |
319 | 4936f2e2 | Christos Stavrakakis | # Debug mode, process messages without daemonizing
|
320 | d28244af | Vangelis Koukis | if opts.debug:
|
321 | d28244af | Vangelis Koukis | debug_mode() |
322 | d28244af | Vangelis Koukis | return
|
323 | 7c62bd54 | Kostas Papadimitriou | |
324 | 4936f2e2 | Christos Stavrakakis | # Create pidfile,
|
325 | 4936f2e2 | Christos Stavrakakis | pidf = pidlockfile.TimeoutPIDLockFile(opts.pid_file, 10)
|
326 | 4936f2e2 | Christos Stavrakakis | |
327 | 4936f2e2 | Christos Stavrakakis | if daemon.runner.is_pidfile_stale(pidf):
|
328 | 4936f2e2 | Christos Stavrakakis | log.warning("Removing stale PID lock file %s", pidf.path)
|
329 | 4936f2e2 | Christos Stavrakakis | pidf.break_lock() |
330 | 4936f2e2 | Christos Stavrakakis | |
331 | 9e98ba3c | Giorgos Verigakis | files_preserve = [] |
332 | 9e98ba3c | Giorgos Verigakis | for handler in log.handlers: |
333 | 9e98ba3c | Giorgos Verigakis | stream = getattr(handler, 'stream') |
334 | 9e98ba3c | Giorgos Verigakis | if stream and hasattr(stream, 'fileno'): |
335 | 9e98ba3c | Giorgos Verigakis | files_preserve.append(handler.stream) |
336 | 7c62bd54 | Kostas Papadimitriou | |
337 | 4936f2e2 | Christos Stavrakakis | stderr_stream = None
|
338 | 4936f2e2 | Christos Stavrakakis | for handler in log.handlers: |
339 | 4936f2e2 | Christos Stavrakakis | stream = getattr(handler, 'stream') |
340 | 4936f2e2 | Christos Stavrakakis | if stream and hasattr(handler, 'baseFilename'): |
341 | 4936f2e2 | Christos Stavrakakis | stderr_stream = stream |
342 | 4936f2e2 | Christos Stavrakakis | break
|
343 | 7c62bd54 | Kostas Papadimitriou | |
344 | 4936f2e2 | Christos Stavrakakis | daemon_context = daemon.DaemonContext( |
345 | 4936f2e2 | Christos Stavrakakis | pidfile=pidf, |
346 | 4936f2e2 | Christos Stavrakakis | umask=0022,
|
347 | 4936f2e2 | Christos Stavrakakis | stdout=stderr_stream, |
348 | 4936f2e2 | Christos Stavrakakis | stderr=stderr_stream, |
349 | 4936f2e2 | Christos Stavrakakis | files_preserve=files_preserve) |
350 | 4936f2e2 | Christos Stavrakakis | |
351 | 4936f2e2 | Christos Stavrakakis | try:
|
352 | 4936f2e2 | Christos Stavrakakis | daemon_context.open() |
353 | 4936f2e2 | Christos Stavrakakis | except (pidlockfile.AlreadyLocked, LockTimeout):
|
354 | 4936f2e2 | Christos Stavrakakis | log.critical("Failed to lock pidfile %s, another instance running?",
|
355 | 4936f2e2 | Christos Stavrakakis | pidf.path) |
356 | 4936f2e2 | Christos Stavrakakis | sys.exit(1)
|
357 | 4936f2e2 | Christos Stavrakakis | |
358 | 4936f2e2 | Christos Stavrakakis | log.info("Became a daemon")
|
359 | 4936f2e2 | Christos Stavrakakis | |
360 | 4936f2e2 | Christos Stavrakakis | if 'gevent' in sys.modules: |
361 | 4936f2e2 | Christos Stavrakakis | # A fork() has occured while daemonizing. If running in
|
362 | 4936f2e2 | Christos Stavrakakis | # gevent context we *must* reinit gevent
|
363 | 4936f2e2 | Christos Stavrakakis | log.debug("gevent imported. Reinitializing gevent")
|
364 | 4936f2e2 | Christos Stavrakakis | import gevent |
365 | 4936f2e2 | Christos Stavrakakis | gevent.reinit() |
366 | d28244af | Vangelis Koukis | |
367 | d28244af | Vangelis Koukis | # Catch every exception, make sure it gets logged properly
|
368 | d28244af | Vangelis Koukis | try:
|
369 | d28244af | Vangelis Koukis | daemon_mode(opts) |
370 | d28244af | Vangelis Koukis | except Exception: |
371 | 9e98ba3c | Giorgos Verigakis | log.exception("Unknown error")
|
372 | d28244af | Vangelis Koukis | raise
|
373 | d28244af | Vangelis Koukis | |
374 | 7c62bd54 | Kostas Papadimitriou | if __name__ == "__main__": |
375 | 3d975c75 | Kostas Papadimitriou | sys.exit(main()) |
376 | 7c62bd54 | Kostas Papadimitriou | |
377 | 8d8ea051 | Georgios Gousios | # vim: set sta sts=4 shiftwidth=4 sw=4 et ai : |