Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / dispatcher.py @ 3c755209

History | View | Annotate | Download (13.9 kB)

1 d08a5f6f Vangelis Koukis
#!/usr/bin/env python
2 48130e66 Georgios Gousios
# Copyright 2011 GRNET S.A. All rights reserved.
3 87ace70f Vassilios Karakoidas
#
4 48130e66 Georgios Gousios
# Redistribution and use in source and binary forms, with or without
5 48130e66 Georgios Gousios
# modification, are permitted provided that the following conditions
6 48130e66 Georgios Gousios
# are met:
7 7bd50624 Vassilios Karakoidas
#
8 48130e66 Georgios Gousios
#   1. Redistributions of source code must retain the above copyright
9 48130e66 Georgios Gousios
#      notice, this list of conditions and the following disclaimer.
10 48130e66 Georgios Gousios
#
11 48130e66 Georgios Gousios
#  2. Redistributions in binary form must reproduce the above copyright
12 48130e66 Georgios Gousios
#     notice, this list of conditions and the following disclaimer in the
13 48130e66 Georgios Gousios
#     documentation and/or other materials provided with the distribution.
14 48130e66 Georgios Gousios
#
15 48130e66 Georgios Gousios
# THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
16 48130e66 Georgios Gousios
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17 48130e66 Georgios Gousios
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18 48130e66 Georgios Gousios
# ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
19 48130e66 Georgios Gousios
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
20 48130e66 Georgios Gousios
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
21 48130e66 Georgios Gousios
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
22 48130e66 Georgios Gousios
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
23 48130e66 Georgios Gousios
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
24 48130e66 Georgios Gousios
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
25 48130e66 Georgios Gousios
# SUCH DAMAGE.
26 48130e66 Georgios Gousios
#
27 48130e66 Georgios Gousios
# The views and conclusions contained in the software and documentation are
28 48130e66 Georgios Gousios
# those of the authors and should not be interpreted as representing official
29 48130e66 Georgios Gousios
# policies, either expressed or implied, of GRNET S.A.
30 48130e66 Georgios Gousios
31 48130e66 Georgios Gousios
32 e6209aa2 Georgios Gousios
""" Message queue setup, dispatch and admin
33 c183005e Georgios Gousios

34 2cd99e7a Georgios Gousios
This program sets up connections to the queues configured in settings.py
35 2cd99e7a Georgios Gousios
and implements the message wait and dispatch loops. Actual messages are
36 2cd99e7a Georgios Gousios
handled in the dispatched functions.
37 fcbc5bb3 Vassilios Karakoidas

38 d08a5f6f Vangelis Koukis
"""
39 d08a5f6f Vangelis Koukis
from django.core.management import setup_environ
40 c99fe4c7 Vassilios Karakoidas
41 d08a5f6f Vangelis Koukis
import sys
42 86221fd5 Panos Louridas
import os
43 86221fd5 Panos Louridas
path = os.path.normpath(os.path.join(os.getcwd(), '..'))
44 86221fd5 Panos Louridas
sys.path.append(path)
45 d08a5f6f Vangelis Koukis
46 de470b1e Kostas Papadimitriou
from synnefo import settings
47 d08a5f6f Vangelis Koukis
setup_environ(settings)
48 d08a5f6f Vangelis Koukis
49 da102335 Georgios Gousios
from amqplib import client_0_8 as amqp
50 8861126f Georgios Gousios
from signal import signal, SIGINT, SIGTERM
51 23c84263 Georgios Gousios
52 9e98ba3c Giorgos Verigakis
import logging
53 8d8ea051 Georgios Gousios
import time
54 8d8ea051 Georgios Gousios
import socket
55 4ed2e471 Georgios Gousios
from daemon import daemon
56 4ed2e471 Georgios Gousios
57 4ed2e471 Georgios Gousios
# Take care of differences between python-daemon versions.
58 4ed2e471 Georgios Gousios
try:
59 4ed2e471 Georgios Gousios
    from daemon import pidfile
60 4ed2e471 Georgios Gousios
except:
61 4ed2e471 Georgios Gousios
    from daemon import pidlockfile
62 8d8ea051 Georgios Gousios
63 9cb903f9 Vangelis Koukis
from synnefo.logic import callbacks
64 9e98ba3c Giorgos Verigakis
from synnefo.util.dictconfig import dictConfig
65 9e98ba3c Giorgos Verigakis
66 9e98ba3c Giorgos Verigakis
67 9e98ba3c Giorgos Verigakis
log = logging.getLogger()
68 9e98ba3c Giorgos Verigakis
69 c99fe4c7 Vassilios Karakoidas
70 698d0666 Georgios Gousios
# Queue names
71 698d0666 Georgios Gousios
QUEUES = []
72 698d0666 Georgios Gousios
73 698d0666 Georgios Gousios
# Queue bindings to exchanges
74 698d0666 Georgios Gousios
BINDINGS = []
75 698d0666 Georgios Gousios
76 2bf8d695 Vangelis Koukis
77 78e2d194 Georgios Gousios
class Dispatcher:
78 78e2d194 Georgios Gousios
    chan = None
79 5d081749 Georgios Gousios
    debug = False
80 5d081749 Georgios Gousios
    clienttags = []
81 78e2d194 Georgios Gousios
82 2bf8d695 Vangelis Koukis
    def __init__(self, debug=False):
83 5d081749 Georgios Gousios
        self.debug = debug
84 5d081749 Georgios Gousios
        self._init()
85 da102335 Georgios Gousios
86 78e2d194 Georgios Gousios
    def wait(self):
87 78e2d194 Georgios Gousios
        while True:
88 78e2d194 Georgios Gousios
            try:
89 78e2d194 Georgios Gousios
                self.chan.wait()
90 78e2d194 Georgios Gousios
            except SystemExit:
91 78e2d194 Georgios Gousios
                break
92 cadaffb1 Georgios Gousios
            except amqp.exceptions.AMQPConnectionException:
93 9e98ba3c Giorgos Verigakis
                log.error("Server went away, reconnecting...")
94 cadaffb1 Georgios Gousios
                self._init()
95 5d081749 Georgios Gousios
            except socket.error:
96 9e98ba3c Giorgos Verigakis
                log.error("Server went away, reconnecting...")
97 5d081749 Georgios Gousios
                self._init()
98 e6f5bb10 Vangelis Koukis
            except Exception, e:
99 9e98ba3c Giorgos Verigakis
                log.exception("Caught unexpected exception")
100 78e2d194 Georgios Gousios
101 5d081749 Georgios Gousios
        [self.chan.basic_cancel(clienttag) for clienttag in self.clienttags]
102 78e2d194 Georgios Gousios
        self.chan.connection.close()
103 838239fa Georgios Gousios
        self.chan.close()
104 78e2d194 Georgios Gousios
105 5d081749 Georgios Gousios
    def _init(self):
106 698d0666 Georgios Gousios
        global QUEUES, BINDINGS
107 9e98ba3c Giorgos Verigakis
        log.info("Initializing")
108 226f086a Georgios Gousios
109 c183005e Georgios Gousios
        # Connect to RabbitMQ
110 23c84263 Georgios Gousios
        conn = None
111 23c84263 Georgios Gousios
        while conn == None:
112 9e98ba3c Giorgos Verigakis
            log.info("Attempting to connect to %s", settings.RABBIT_HOST)
113 23c84263 Georgios Gousios
            try:
114 41f2249e Vangelis Koukis
                conn = amqp.Connection(host=settings.RABBIT_HOST,
115 41f2249e Vangelis Koukis
                                       userid=settings.RABBIT_USERNAME,
116 41f2249e Vangelis Koukis
                                       password=settings.RABBIT_PASSWORD,
117 41f2249e Vangelis Koukis
                                       virtual_host=settings.RABBIT_VHOST)
118 23c84263 Georgios Gousios
            except socket.error:
119 9e98ba3c Giorgos Verigakis
                log.error("Failed to connect to %s, retrying in 10s",
120 d28244af Vangelis Koukis
                                  settings.RABBIT_HOST)
121 d28244af Vangelis Koukis
                time.sleep(10)
122 23c84263 Georgios Gousios
123 9e98ba3c Giorgos Verigakis
        log.info("Connection succesful, opening channel")
124 23c84263 Georgios Gousios
        self.chan = conn.channel()
125 78e2d194 Georgios Gousios
126 c183005e Georgios Gousios
        # Declare queues and exchanges
127 f30730c0 Georgios Gousios
        for exchange in settings.EXCHANGES:
128 c183005e Georgios Gousios
            self.chan.exchange_declare(exchange=exchange, type="topic",
129 c183005e Georgios Gousios
                                       durable=True, auto_delete=False)
130 f30730c0 Georgios Gousios
131 226f086a Georgios Gousios
        for queue in QUEUES:
132 c183005e Georgios Gousios
            self.chan.queue_declare(queue=queue, durable=True,
133 c183005e Georgios Gousios
                                    exclusive=False, auto_delete=False)
134 78e2d194 Georgios Gousios
135 226f086a Georgios Gousios
        bindings = BINDINGS
136 78e2d194 Georgios Gousios
137 c183005e Georgios Gousios
        # Bind queues to handler methods
138 5d081749 Georgios Gousios
        for binding in bindings:
139 78e2d194 Georgios Gousios
            try:
140 9cb903f9 Vangelis Koukis
                callback = getattr(callbacks, binding[3])
141 23c84263 Georgios Gousios
            except AttributeError:
142 9e98ba3c Giorgos Verigakis
                log.error("Cannot find callback %s", binding[3])
143 e6f5bb10 Vangelis Koukis
                raise SystemExit(1)
144 8d8ea051 Georgios Gousios
145 c183005e Georgios Gousios
            self.chan.queue_bind(queue=binding[0], exchange=binding[1],
146 c183005e Georgios Gousios
                                 routing_key=binding[2])
147 2cd99e7a Georgios Gousios
            tag = self.chan.basic_consume(queue=binding[0], callback=callback)
148 9e98ba3c Giorgos Verigakis
            log.debug("Binding %s(%s) to queue %s with handler %s",
149 9e98ba3c Giorgos Verigakis
                              binding[1], binding[2], binding[0], binding[3])
150 23c84263 Georgios Gousios
            self.clienttags.append(tag)
151 8d8ea051 Georgios Gousios
152 c183005e Georgios Gousios
153 698d0666 Georgios Gousios
def _init_queues():
154 698d0666 Georgios Gousios
    global QUEUES, BINDINGS
155 698d0666 Georgios Gousios
156 698d0666 Georgios Gousios
    # Queue declarations
157 698d0666 Georgios Gousios
    prefix = settings.BACKEND_PREFIX_ID.split('-')[0]
158 698d0666 Georgios Gousios
159 698d0666 Georgios Gousios
    QUEUE_GANETI_EVENTS_OP = "%s-events-op" % prefix
160 698d0666 Georgios Gousios
    QUEUE_GANETI_EVENTS_NET = "%s-events-net" % prefix
161 9068cd85 Georgios Gousios
    QUEUE_GANETI_BUILD_PROGR = "%s-events-progress" % prefix
162 698d0666 Georgios Gousios
    QUEUE_RECONC = "%s-reconciliation" % prefix
163 698d0666 Georgios Gousios
    if settings.DEBUG is True:
164 698d0666 Georgios Gousios
        QUEUE_DEBUG = "debug"       # Debug queue, retrieves all messages
165 698d0666 Georgios Gousios
166 fd56d250 Giorgos Verigakis
    QUEUES = (QUEUE_GANETI_EVENTS_OP, QUEUE_GANETI_EVENTS_NET, QUEUE_RECONC,
167 fd56d250 Giorgos Verigakis
              QUEUE_GANETI_BUILD_PROGR)
168 698d0666 Georgios Gousios
169 698d0666 Georgios Gousios
    # notifications of type "ganeti-op-status"
170 2bf8d695 Vangelis Koukis
    DB_HANDLER_KEY_OP = 'ganeti.%s.event.op' % prefix
171 698d0666 Georgios Gousios
    # notifications of type "ganeti-net-status"
172 2bf8d695 Vangelis Koukis
    DB_HANDLER_KEY_NET = 'ganeti.%s.event.net' % prefix
173 e6f5bb10 Vangelis Koukis
    # notifications of type "ganeti-create-progress"
174 e6f5bb10 Vangelis Koukis
    BUILD_MONITOR_HANDLER = 'ganeti.%s.event.progress' % prefix
175 0230cd3a Vangelis Koukis
    # reconciliation
176 633d9cfa Georgios Gousios
    RECONC_HANDLER = 'reconciliation.%s.*' % prefix
177 698d0666 Georgios Gousios
178 698d0666 Georgios Gousios
    BINDINGS = [
179 9068cd85 Georgios Gousios
    # Queue                   # Exchange                # RouteKey              # Handler
180 9068cd85 Georgios Gousios
    (QUEUE_GANETI_EVENTS_OP,  settings.EXCHANGE_GANETI, DB_HANDLER_KEY_OP,      'update_db'),
181 9068cd85 Georgios Gousios
    (QUEUE_GANETI_EVENTS_NET, settings.EXCHANGE_GANETI, DB_HANDLER_KEY_NET,     'update_net'),
182 9068cd85 Georgios Gousios
    (QUEUE_GANETI_BUILD_PROGR,settings.EXCHANGE_GANETI, BUILD_MONITOR_HANDLER,  'update_build_progress'),
183 633d9cfa Georgios Gousios
    (QUEUE_RECONC,            settings.EXCHANGE_CRON,   RECONC_HANDLER,         'trigger_status_update'),
184 698d0666 Georgios Gousios
    ]
185 698d0666 Georgios Gousios
186 698d0666 Georgios Gousios
    if settings.DEBUG is True:
187 698d0666 Georgios Gousios
        BINDINGS += [
188 698d0666 Georgios Gousios
            # Queue       # Exchange          # RouteKey  # Handler
189 698d0666 Georgios Gousios
            (QUEUE_DEBUG, settings.EXCHANGE_GANETI, '#',  'dummy_proc'),
190 698d0666 Georgios Gousios
            (QUEUE_DEBUG, settings.EXCHANGE_CRON,   '#',  'dummy_proc'),
191 698d0666 Georgios Gousios
            (QUEUE_DEBUG, settings.EXCHANGE_API,    '#',  'dummy_proc'),
192 698d0666 Georgios Gousios
        ]
193 698d0666 Georgios Gousios
        QUEUES += (QUEUE_DEBUG,)
194 698d0666 Georgios Gousios
195 698d0666 Georgios Gousios
196 c183005e Georgios Gousios
def _exit_handler(signum, frame):
197 d28244af Vangelis Koukis
    """"Catch exit signal in children processes"""
198 9e98ba3c Giorgos Verigakis
    log.info("Caught signal %d, will raise SystemExit", signum)
199 8d8ea051 Georgios Gousios
    raise SystemExit
200 8d8ea051 Georgios Gousios
201 c183005e Georgios Gousios
202 c183005e Georgios Gousios
def _parent_handler(signum, frame):
203 c183005e Georgios Gousios
    """"Catch exit signal in parent process and forward it to children."""
204 9e98ba3c Giorgos Verigakis
    global children
205 9e98ba3c Giorgos Verigakis
    log.info("Caught signal %d, sending SIGTERM to children %s",
206 d28244af Vangelis Koukis
                signum, children)
207 8861126f Georgios Gousios
    [os.kill(pid, SIGTERM) for pid in children]
208 8861126f Georgios Gousios
209 c183005e Georgios Gousios
210 57d0082a Georgios Gousios
def child(cmdline):
211 c183005e Georgios Gousios
    """The context of the child process"""
212 c183005e Georgios Gousios
213 c183005e Georgios Gousios
    # Cmd line argument parsing
214 78e2d194 Georgios Gousios
    (opts, args) = parse_arguments(cmdline)
215 d28244af Vangelis Koukis
    disp = Dispatcher(debug=opts.debug)
216 78e2d194 Georgios Gousios
217 c183005e Georgios Gousios
    # Start the event loop
218 2cd99e7a Georgios Gousios
    disp.wait()
219 78e2d194 Georgios Gousios
220 c183005e Georgios Gousios
221 78e2d194 Georgios Gousios
def parse_arguments(args):
222 78e2d194 Georgios Gousios
    from optparse import OptionParser
223 78e2d194 Georgios Gousios
224 3d975c75 Kostas Papadimitriou
    default_pid_file = os.path.join("var","run","synnefo","dispatcher.pid")
225 78e2d194 Georgios Gousios
    parser = OptionParser()
226 c183005e Georgios Gousios
    parser.add_option("-d", "--debug", action="store_true", default=False,
227 2cd99e7a Georgios Gousios
                      dest="debug", help="Enable debug mode")
228 e6209aa2 Georgios Gousios
    parser.add_option("-w", "--workers", default=2, dest="workers",
229 e6209aa2 Georgios Gousios
                      help="Number of workers to spawn", type="int")
230 3d975c75 Kostas Papadimitriou
    parser.add_option("-p", "--pid-file", dest="pid_file",
231 3d975c75 Kostas Papadimitriou
                      default=default_pid_file,
232 3d975c75 Kostas Papadimitriou
                      help="Save PID to file (default: %s)" % default_pid_file)
233 979482ce Georgios Gousios
    parser.add_option("--purge-queues", action="store_true",
234 979482ce Georgios Gousios
                      default=False, dest="purge_queues",
235 57d0082a Georgios Gousios
                      help="Remove all declared queues (DANGEROUS!)")
236 979482ce Georgios Gousios
    parser.add_option("--purge-exchanges", action="store_true",
237 979482ce Georgios Gousios
                      default=False, dest="purge_exchanges",
238 979482ce Georgios Gousios
                      help="Remove all exchanges. Implies deleting all queues \
239 979482ce Georgios Gousios
                           first (DANGEROUS!)")
240 d5470cdd Georgios Gousios
    parser.add_option("--drain-queue", dest="drain_queue",
241 e6209aa2 Georgios Gousios
                      help="Strips a queue from all outstanding messages")
242 de081774 Georgios Gousios
243 78e2d194 Georgios Gousios
    return parser.parse_args(args)
244 78e2d194 Georgios Gousios
245 f30730c0 Georgios Gousios
246 d28244af Vangelis Koukis
def purge_queues():
247 979482ce Georgios Gousios
    """
248 979482ce Georgios Gousios
        Delete declared queues from RabbitMQ. Use with care!
249 979482ce Georgios Gousios
    """
250 698d0666 Georgios Gousios
    global QUEUES, BINDINGS
251 979482ce Georgios Gousios
    conn = get_connection()
252 f30730c0 Georgios Gousios
    chan = conn.channel()
253 f30730c0 Georgios Gousios
254 698d0666 Georgios Gousios
    print "Queues to be deleted: ", QUEUES
255 f30730c0 Georgios Gousios
256 979482ce Georgios Gousios
    if not get_user_confirmation():
257 f30730c0 Georgios Gousios
        return
258 f30730c0 Georgios Gousios
259 698d0666 Georgios Gousios
    for queue in QUEUES:
260 f30730c0 Georgios Gousios
        try:
261 f30730c0 Georgios Gousios
            chan.queue_delete(queue=queue)
262 979482ce Georgios Gousios
            print "Deleting queue %s" % queue
263 979482ce Georgios Gousios
        except amqp.exceptions.AMQPChannelException as e:
264 979482ce Georgios Gousios
            print e.amqp_reply_code, " ", e.amqp_reply_text
265 979482ce Georgios Gousios
            chan = conn.channel()
266 979482ce Georgios Gousios
267 979482ce Georgios Gousios
    chan.connection.close()
268 979482ce Georgios Gousios
269 979482ce Georgios Gousios
270 979482ce Georgios Gousios
def purge_exchanges():
271 e6f5bb10 Vangelis Koukis
    """Delete declared exchanges from RabbitMQ, after removing all queues"""
272 698d0666 Georgios Gousios
    global QUEUES, BINDINGS
273 979482ce Georgios Gousios
    purge_queues()
274 979482ce Georgios Gousios
275 979482ce Georgios Gousios
    conn = get_connection()
276 979482ce Georgios Gousios
    chan = conn.channel()
277 979482ce Georgios Gousios
278 e6f5bb10 Vangelis Koukis
    print "Exchanges to be deleted: ", settings.EXCHANGES
279 979482ce Georgios Gousios
280 979482ce Georgios Gousios
    if not get_user_confirmation():
281 979482ce Georgios Gousios
        return
282 979482ce Georgios Gousios
283 979482ce Georgios Gousios
    for exchange in settings.EXCHANGES:
284 979482ce Georgios Gousios
        try:
285 979482ce Georgios Gousios
            chan.exchange_delete(exchange=exchange)
286 f30730c0 Georgios Gousios
        except amqp.exceptions.AMQPChannelException as e:
287 f30730c0 Georgios Gousios
            print e.amqp_reply_code, " ", e.amqp_reply_text
288 979482ce Georgios Gousios
289 8861126f Georgios Gousios
    chan.connection.close()
290 f30730c0 Georgios Gousios
291 c183005e Georgios Gousios
292 979482ce Georgios Gousios
def drain_queue(queue):
293 e6f5bb10 Vangelis Koukis
    """Strip a (declared) queue from all outstanding messages"""
294 698d0666 Georgios Gousios
    global QUEUES, BINDINGS
295 979482ce Georgios Gousios
    if not queue:
296 979482ce Georgios Gousios
        return
297 979482ce Georgios Gousios
298 698d0666 Georgios Gousios
    if not queue in QUEUES:
299 979482ce Georgios Gousios
        print "Queue %s not configured" % queue
300 979482ce Georgios Gousios
        return
301 979482ce Georgios Gousios
302 979482ce Georgios Gousios
    print "Queue to be drained: %s" % queue
303 979482ce Georgios Gousios
304 979482ce Georgios Gousios
    if not get_user_confirmation():
305 979482ce Georgios Gousios
        return
306 979482ce Georgios Gousios
    conn = get_connection()
307 979482ce Georgios Gousios
    chan = conn.channel()
308 979482ce Georgios Gousios
309 e6209aa2 Georgios Gousios
    # Register a temporary queue binding
310 698d0666 Georgios Gousios
    for binding in BINDINGS:
311 e6209aa2 Georgios Gousios
        if binding[0] == queue:
312 e6209aa2 Georgios Gousios
            exch = binding[1]
313 e6209aa2 Georgios Gousios
314 e6209aa2 Georgios Gousios
    if not exch:
315 e6209aa2 Georgios Gousios
        print "Queue not bound to any exchange: %s" % queue
316 e6209aa2 Georgios Gousios
        return
317 e6209aa2 Georgios Gousios
318 2bf8d695 Vangelis Koukis
    chan.queue_bind(queue=queue, exchange=exch, routing_key='#')
319 e6209aa2 Georgios Gousios
    tag = chan.basic_consume(queue=queue, callback=callbacks.dummy_proc)
320 e6209aa2 Georgios Gousios
321 e6209aa2 Georgios Gousios
    print "Queue draining about to start, hit Ctrl+c when done"
322 e6209aa2 Georgios Gousios
    time.sleep(2)
323 e6209aa2 Georgios Gousios
    print "Queue draining starting"
324 e6209aa2 Georgios Gousios
325 e6209aa2 Georgios Gousios
    signal(SIGTERM, _exit_handler)
326 e6209aa2 Georgios Gousios
    signal(SIGINT, _exit_handler)
327 e6209aa2 Georgios Gousios
328 c626e1d0 Georgios Gousios
    num_processed = 0
329 e6209aa2 Georgios Gousios
    while True:
330 e6209aa2 Georgios Gousios
        chan.wait()
331 c626e1d0 Georgios Gousios
        num_processed += 1
332 c626e1d0 Georgios Gousios
        sys.stderr.write("Ignored %d messages\r" % num_processed)
333 c626e1d0 Georgios Gousios
334 e6209aa2 Georgios Gousios
    chan.basic_cancel(tag)
335 979482ce Georgios Gousios
    chan.connection.close()
336 979482ce Georgios Gousios
337 95aee02c Vangelis Koukis
338 979482ce Georgios Gousios
def get_connection():
339 e6f5bb10 Vangelis Koukis
    conn = amqp.Connection(host=settings.RABBIT_HOST,
340 e6f5bb10 Vangelis Koukis
                           userid=settings.RABBIT_USERNAME,
341 e6f5bb10 Vangelis Koukis
                           password=settings.RABBIT_PASSWORD,
342 e6f5bb10 Vangelis Koukis
                           virtual_host=settings.RABBIT_VHOST)
343 979482ce Georgios Gousios
    return conn
344 979482ce Georgios Gousios
345 95aee02c Vangelis Koukis
346 979482ce Georgios Gousios
def get_user_confirmation():
347 979482ce Georgios Gousios
    ans = raw_input("Are you sure (N/y):")
348 979482ce Georgios Gousios
349 979482ce Georgios Gousios
    if not ans:
350 979482ce Georgios Gousios
        return False
351 979482ce Georgios Gousios
    if ans not in ['Y', 'y']:
352 979482ce Georgios Gousios
        return False
353 979482ce Georgios Gousios
    return True
354 979482ce Georgios Gousios
355 979482ce Georgios Gousios
356 57d0082a Georgios Gousios
def debug_mode():
357 2bf8d695 Vangelis Koukis
    disp = Dispatcher(debug=True)
358 838239fa Georgios Gousios
    signal(SIGINT, _exit_handler)
359 838239fa Georgios Gousios
    signal(SIGTERM, _exit_handler)
360 838239fa Georgios Gousios
361 838239fa Georgios Gousios
    disp.wait()
362 838239fa Georgios Gousios
363 838239fa Georgios Gousios
364 d28244af Vangelis Koukis
def daemon_mode(opts):
365 9e98ba3c Giorgos Verigakis
    global children
366 698d0666 Georgios Gousios
367 d28244af Vangelis Koukis
    # Create pidfile,
368 d28244af Vangelis Koukis
    # take care of differences between python-daemon versions
369 4ed2e471 Georgios Gousios
    try:
370 4ed2e471 Georgios Gousios
        pidf = pidfile.TimeoutPIDLockFile(opts.pid_file, 10)
371 4ed2e471 Georgios Gousios
    except:
372 4ed2e471 Georgios Gousios
        pidf = pidlockfile.TimeoutPIDLockFile(opts.pid_file, 10)
373 4ed2e471 Georgios Gousios
374 de081774 Georgios Gousios
    pidf.acquire()
375 de081774 Georgios Gousios
376 9e98ba3c Giorgos Verigakis
    log.info("Became a daemon")
377 57d0082a Georgios Gousios
378 c183005e Georgios Gousios
    # Fork workers
379 8861126f Georgios Gousios
    children = []
380 8861126f Georgios Gousios
381 8861126f Georgios Gousios
    i = 0
382 8861126f Georgios Gousios
    while i < opts.workers:
383 8861126f Georgios Gousios
        newpid = os.fork()
384 8861126f Georgios Gousios
385 8861126f Georgios Gousios
        if newpid == 0:
386 d28244af Vangelis Koukis
            signal(SIGINT, _exit_handler)
387 c183005e Georgios Gousios
            signal(SIGTERM, _exit_handler)
388 57d0082a Georgios Gousios
            child(sys.argv[1:])
389 4dc0b46a Georgios Gousios
            sys.exit(1)
390 8861126f Georgios Gousios
        else:
391 9e98ba3c Giorgos Verigakis
            log.debug("%d, forked child: %d", os.getpid(), newpid)
392 9e98ba3c Giorgos Verigakis
            children.append(newpid)
393 8861126f Georgios Gousios
        i += 1
394 8861126f Georgios Gousios
395 8d8ea051 Georgios Gousios
    # Catch signals to ensure graceful shutdown
396 d28244af Vangelis Koukis
    signal(SIGINT, _parent_handler)
397 c183005e Georgios Gousios
    signal(SIGTERM, _parent_handler)
398 8861126f Georgios Gousios
399 57d0082a Georgios Gousios
    # Wait for all children processes to die, one by one
400 d28244af Vangelis Koukis
    try:
401 de081774 Georgios Gousios
        for pid in children:
402 de081774 Georgios Gousios
            try:
403 de081774 Georgios Gousios
                os.waitpid(pid, 0)
404 de081774 Georgios Gousios
            except Exception:
405 de081774 Georgios Gousios
                pass
406 de081774 Georgios Gousios
    finally:
407 de081774 Georgios Gousios
        pidf.release()
408 c183005e Georgios Gousios
409 d28244af Vangelis Koukis
410 d28244af Vangelis Koukis
def main():
411 3d975c75 Kostas Papadimitriou
    dictConfig(settings.DISPATCHER_LOGGING)
412 3d975c75 Kostas Papadimitriou
413 9e98ba3c Giorgos Verigakis
    global log
414 3d975c75 Kostas Papadimitriou
415 d28244af Vangelis Koukis
    (opts, args) = parse_arguments(sys.argv[1:])
416 d28244af Vangelis Koukis
417 d28244af Vangelis Koukis
    # Init the global variables containing the queues
418 d28244af Vangelis Koukis
    _init_queues()
419 d28244af Vangelis Koukis
420 d28244af Vangelis Koukis
    # Special case for the clean up queues action
421 d28244af Vangelis Koukis
    if opts.purge_queues:
422 d28244af Vangelis Koukis
        purge_queues()
423 d28244af Vangelis Koukis
        return
424 d28244af Vangelis Koukis
425 d28244af Vangelis Koukis
    # Special case for the clean up exch action
426 d28244af Vangelis Koukis
    if opts.purge_exchanges:
427 d28244af Vangelis Koukis
        purge_exchanges()
428 d28244af Vangelis Koukis
        return
429 d28244af Vangelis Koukis
430 d28244af Vangelis Koukis
    if opts.drain_queue:
431 d28244af Vangelis Koukis
        drain_queue(opts.drain_queue)
432 d28244af Vangelis Koukis
        return
433 d28244af Vangelis Koukis
434 d28244af Vangelis Koukis
    # Debug mode, process messages without spawning workers
435 d28244af Vangelis Koukis
    if opts.debug:
436 d28244af Vangelis Koukis
        debug_mode()
437 d28244af Vangelis Koukis
        return
438 7c62bd54 Kostas Papadimitriou
439 9e98ba3c Giorgos Verigakis
    files_preserve = []
440 9e98ba3c Giorgos Verigakis
    for handler in log.handlers:
441 9e98ba3c Giorgos Verigakis
        stream = getattr(handler, 'stream')
442 9e98ba3c Giorgos Verigakis
        if stream and hasattr(stream, 'fileno'):
443 9e98ba3c Giorgos Verigakis
            files_preserve.append(handler.stream)
444 7c62bd54 Kostas Papadimitriou
445 d28244af Vangelis Koukis
    daemon_context = daemon.DaemonContext(
446 d28244af Vangelis Koukis
        files_preserve=files_preserve,
447 d28244af Vangelis Koukis
        umask=022)
448 7c62bd54 Kostas Papadimitriou
449 d28244af Vangelis Koukis
    daemon_context.open()
450 d28244af Vangelis Koukis
451 d28244af Vangelis Koukis
    # Catch every exception, make sure it gets logged properly
452 d28244af Vangelis Koukis
    try:
453 d28244af Vangelis Koukis
        daemon_mode(opts)
454 d28244af Vangelis Koukis
    except Exception:
455 9e98ba3c Giorgos Verigakis
        log.exception("Unknown error")
456 d28244af Vangelis Koukis
        raise
457 d28244af Vangelis Koukis
458 d28244af Vangelis Koukis
459 7c62bd54 Kostas Papadimitriou
if __name__ == "__main__":
460 3d975c75 Kostas Papadimitriou
    sys.exit(main())
461 7c62bd54 Kostas Papadimitriou
462 8d8ea051 Georgios Gousios
# vim: set sta sts=4 shiftwidth=4 sw=4 et ai :