Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / dispatcher.py @ fd00a867

History | View | Annotate | Download (14.4 kB)

1 d08a5f6f Vangelis Koukis
#!/usr/bin/env python
2 48130e66 Georgios Gousios
# Copyright 2011 GRNET S.A. All rights reserved.
3 87ace70f Vassilios Karakoidas
#
4 48130e66 Georgios Gousios
# Redistribution and use in source and binary forms, with or without
5 48130e66 Georgios Gousios
# modification, are permitted provided that the following conditions
6 48130e66 Georgios Gousios
# are met:
7 7bd50624 Vassilios Karakoidas
#
8 48130e66 Georgios Gousios
#   1. Redistributions of source code must retain the above copyright
9 48130e66 Georgios Gousios
#      notice, this list of conditions and the following disclaimer.
10 48130e66 Georgios Gousios
#
11 48130e66 Georgios Gousios
#  2. Redistributions in binary form must reproduce the above copyright
12 48130e66 Georgios Gousios
#     notice, this list of conditions and the following disclaimer in the
13 48130e66 Georgios Gousios
#     documentation and/or other materials provided with the distribution.
14 48130e66 Georgios Gousios
#
15 48130e66 Georgios Gousios
# THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
16 48130e66 Georgios Gousios
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17 48130e66 Georgios Gousios
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18 48130e66 Georgios Gousios
# ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
19 48130e66 Georgios Gousios
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
20 48130e66 Georgios Gousios
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
21 48130e66 Georgios Gousios
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
22 48130e66 Georgios Gousios
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
23 48130e66 Georgios Gousios
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
24 48130e66 Georgios Gousios
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
25 48130e66 Georgios Gousios
# SUCH DAMAGE.
26 48130e66 Georgios Gousios
#
27 48130e66 Georgios Gousios
# The views and conclusions contained in the software and documentation are
28 48130e66 Georgios Gousios
# those of the authors and should not be interpreted as representing official
29 48130e66 Georgios Gousios
# policies, either expressed or implied, of GRNET S.A.
30 48130e66 Georgios Gousios
31 48130e66 Georgios Gousios
32 e6209aa2 Georgios Gousios
""" Message queue setup, dispatch and admin
33 c183005e Georgios Gousios

34 2cd99e7a Georgios Gousios
This program sets up connections to the queues configured in settings.py
35 2cd99e7a Georgios Gousios
and implements the message wait and dispatch loops. Actual messages are
36 2cd99e7a Georgios Gousios
handled in the dispatched functions.
37 fcbc5bb3 Vassilios Karakoidas

38 d08a5f6f Vangelis Koukis
"""
39 d08a5f6f Vangelis Koukis
from django.core.management import setup_environ
40 c99fe4c7 Vassilios Karakoidas
41 d08a5f6f Vangelis Koukis
import sys
42 86221fd5 Panos Louridas
import os
43 86221fd5 Panos Louridas
path = os.path.normpath(os.path.join(os.getcwd(), '..'))
44 86221fd5 Panos Louridas
sys.path.append(path)
45 d08a5f6f Vangelis Koukis
46 de470b1e Kostas Papadimitriou
from synnefo import settings
47 d08a5f6f Vangelis Koukis
setup_environ(settings)
48 d08a5f6f Vangelis Koukis
49 da102335 Georgios Gousios
from amqplib import client_0_8 as amqp
50 8861126f Georgios Gousios
from signal import signal, SIGINT, SIGTERM
51 23c84263 Georgios Gousios
52 9e98ba3c Giorgos Verigakis
import logging
53 8d8ea051 Georgios Gousios
import time
54 8d8ea051 Georgios Gousios
import socket
55 4ed2e471 Georgios Gousios
from daemon import daemon
56 4ed2e471 Georgios Gousios
57 4ed2e471 Georgios Gousios
# Take care of differences between python-daemon versions.
58 4ed2e471 Georgios Gousios
try:
59 4ed2e471 Georgios Gousios
    from daemon import pidfile
60 4ed2e471 Georgios Gousios
except:
61 4ed2e471 Georgios Gousios
    from daemon import pidlockfile
62 8d8ea051 Georgios Gousios
63 9cb903f9 Vangelis Koukis
from synnefo.logic import callbacks
64 9e98ba3c Giorgos Verigakis
from synnefo.util.dictconfig import dictConfig
65 9e98ba3c Giorgos Verigakis
66 9e98ba3c Giorgos Verigakis
67 9e98ba3c Giorgos Verigakis
log = logging.getLogger()
68 9e98ba3c Giorgos Verigakis
69 c99fe4c7 Vassilios Karakoidas
70 698d0666 Georgios Gousios
# Queue names
71 698d0666 Georgios Gousios
QUEUES = []
72 698d0666 Georgios Gousios
73 698d0666 Georgios Gousios
# Queue bindings to exchanges
74 698d0666 Georgios Gousios
BINDINGS = []
75 698d0666 Georgios Gousios
76 2bf8d695 Vangelis Koukis
77 78e2d194 Georgios Gousios
class Dispatcher:
78 78e2d194 Georgios Gousios
    chan = None
79 5d081749 Georgios Gousios
    debug = False
80 5d081749 Georgios Gousios
    clienttags = []
81 78e2d194 Georgios Gousios
82 2bf8d695 Vangelis Koukis
    def __init__(self, debug=False):
83 5d081749 Georgios Gousios
        self.debug = debug
84 5d081749 Georgios Gousios
        self._init()
85 da102335 Georgios Gousios
86 78e2d194 Georgios Gousios
    def wait(self):
87 78e2d194 Georgios Gousios
        while True:
88 78e2d194 Georgios Gousios
            try:
89 78e2d194 Georgios Gousios
                self.chan.wait()
90 78e2d194 Georgios Gousios
            except SystemExit:
91 78e2d194 Georgios Gousios
                break
92 cadaffb1 Georgios Gousios
            except amqp.exceptions.AMQPConnectionException:
93 9e98ba3c Giorgos Verigakis
                log.error("Server went away, reconnecting...")
94 cadaffb1 Georgios Gousios
                self._init()
95 5d081749 Georgios Gousios
            except socket.error:
96 9e98ba3c Giorgos Verigakis
                log.error("Server went away, reconnecting...")
97 5d081749 Georgios Gousios
                self._init()
98 e6f5bb10 Vangelis Koukis
            except Exception, e:
99 9e98ba3c Giorgos Verigakis
                log.exception("Caught unexpected exception")
100 78e2d194 Georgios Gousios
101 5d081749 Georgios Gousios
        [self.chan.basic_cancel(clienttag) for clienttag in self.clienttags]
102 78e2d194 Georgios Gousios
        self.chan.connection.close()
103 838239fa Georgios Gousios
        self.chan.close()
104 78e2d194 Georgios Gousios
105 5d081749 Georgios Gousios
    def _init(self):
106 698d0666 Georgios Gousios
        global QUEUES, BINDINGS
107 9e98ba3c Giorgos Verigakis
        log.info("Initializing")
108 226f086a Georgios Gousios
109 c183005e Georgios Gousios
        # Connect to RabbitMQ
110 23c84263 Georgios Gousios
        conn = None
111 23c84263 Georgios Gousios
        while conn == None:
112 9e98ba3c Giorgos Verigakis
            log.info("Attempting to connect to %s", settings.RABBIT_HOST)
113 23c84263 Georgios Gousios
            try:
114 41f2249e Vangelis Koukis
                conn = amqp.Connection(host=settings.RABBIT_HOST,
115 41f2249e Vangelis Koukis
                                       userid=settings.RABBIT_USERNAME,
116 41f2249e Vangelis Koukis
                                       password=settings.RABBIT_PASSWORD,
117 41f2249e Vangelis Koukis
                                       virtual_host=settings.RABBIT_VHOST)
118 23c84263 Georgios Gousios
            except socket.error:
119 9e98ba3c Giorgos Verigakis
                log.error("Failed to connect to %s, retrying in 10s",
120 d28244af Vangelis Koukis
                                  settings.RABBIT_HOST)
121 d28244af Vangelis Koukis
                time.sleep(10)
122 23c84263 Georgios Gousios
123 9e98ba3c Giorgos Verigakis
        log.info("Connection succesful, opening channel")
124 23c84263 Georgios Gousios
        self.chan = conn.channel()
125 78e2d194 Georgios Gousios
126 c183005e Georgios Gousios
        # Declare queues and exchanges
127 f30730c0 Georgios Gousios
        for exchange in settings.EXCHANGES:
128 c183005e Georgios Gousios
            self.chan.exchange_declare(exchange=exchange, type="topic",
129 c183005e Georgios Gousios
                                       durable=True, auto_delete=False)
130 f30730c0 Georgios Gousios
131 226f086a Georgios Gousios
        for queue in QUEUES:
132 c183005e Georgios Gousios
            self.chan.queue_declare(queue=queue, durable=True,
133 c183005e Georgios Gousios
                                    exclusive=False, auto_delete=False)
134 78e2d194 Georgios Gousios
135 226f086a Georgios Gousios
        bindings = BINDINGS
136 78e2d194 Georgios Gousios
137 c183005e Georgios Gousios
        # Bind queues to handler methods
138 5d081749 Georgios Gousios
        for binding in bindings:
139 78e2d194 Georgios Gousios
            try:
140 9cb903f9 Vangelis Koukis
                callback = getattr(callbacks, binding[3])
141 23c84263 Georgios Gousios
            except AttributeError:
142 9e98ba3c Giorgos Verigakis
                log.error("Cannot find callback %s", binding[3])
143 e6f5bb10 Vangelis Koukis
                raise SystemExit(1)
144 8d8ea051 Georgios Gousios
145 c183005e Georgios Gousios
            self.chan.queue_bind(queue=binding[0], exchange=binding[1],
146 c183005e Georgios Gousios
                                 routing_key=binding[2])
147 2cd99e7a Georgios Gousios
            tag = self.chan.basic_consume(queue=binding[0], callback=callback)
148 9e98ba3c Giorgos Verigakis
            log.debug("Binding %s(%s) to queue %s with handler %s",
149 9e98ba3c Giorgos Verigakis
                              binding[1], binding[2], binding[0], binding[3])
150 23c84263 Georgios Gousios
            self.clienttags.append(tag)
151 8d8ea051 Georgios Gousios
152 c183005e Georgios Gousios
153 698d0666 Georgios Gousios
def _init_queues():
154 698d0666 Georgios Gousios
    global QUEUES, BINDINGS
155 698d0666 Georgios Gousios
156 698d0666 Georgios Gousios
    # Queue declarations
157 698d0666 Georgios Gousios
    prefix = settings.BACKEND_PREFIX_ID.split('-')[0]
158 698d0666 Georgios Gousios
159 698d0666 Georgios Gousios
    QUEUE_GANETI_EVENTS_OP = "%s-events-op" % prefix
160 698d0666 Georgios Gousios
    QUEUE_GANETI_EVENTS_NET = "%s-events-net" % prefix
161 9068cd85 Georgios Gousios
    QUEUE_GANETI_BUILD_PROGR = "%s-events-progress" % prefix
162 698d0666 Georgios Gousios
    QUEUE_CRON_CREDITS = "%s-credits" % prefix
163 698d0666 Georgios Gousios
    QUEUE_EMAIL = "%s-email" % prefix
164 698d0666 Georgios Gousios
    QUEUE_RECONC = "%s-reconciliation" % prefix
165 698d0666 Georgios Gousios
    if settings.DEBUG is True:
166 698d0666 Georgios Gousios
        QUEUE_DEBUG = "debug"       # Debug queue, retrieves all messages
167 698d0666 Georgios Gousios
168 698d0666 Georgios Gousios
    QUEUES = (QUEUE_GANETI_EVENTS_OP, QUEUE_GANETI_EVENTS_NET,
169 9068cd85 Georgios Gousios
              QUEUE_CRON_CREDITS, QUEUE_EMAIL, QUEUE_RECONC,
170 9068cd85 Georgios Gousios
              QUEUE_GANETI_BUILD_PROGR)
171 698d0666 Georgios Gousios
172 698d0666 Georgios Gousios
    # notifications of type "ganeti-op-status"
173 2bf8d695 Vangelis Koukis
    DB_HANDLER_KEY_OP = 'ganeti.%s.event.op' % prefix
174 698d0666 Georgios Gousios
    # notifications of type "ganeti-net-status"
175 2bf8d695 Vangelis Koukis
    DB_HANDLER_KEY_NET = 'ganeti.%s.event.net' % prefix
176 e6f5bb10 Vangelis Koukis
    # notifications of type "ganeti-create-progress"
177 e6f5bb10 Vangelis Koukis
    BUILD_MONITOR_HANDLER = 'ganeti.%s.event.progress' % prefix
178 633d9cfa Georgios Gousios
    # email
179 633d9cfa Georgios Gousios
    EMAIL_HANDLER = 'logic.%s.email.*' % prefix
180 0230cd3a Vangelis Koukis
    # reconciliation
181 633d9cfa Georgios Gousios
    RECONC_HANDLER = 'reconciliation.%s.*' % prefix
182 698d0666 Georgios Gousios
183 698d0666 Georgios Gousios
    BINDINGS = [
184 9068cd85 Georgios Gousios
    # Queue                   # Exchange                # RouteKey              # Handler
185 9068cd85 Georgios Gousios
    (QUEUE_GANETI_EVENTS_OP,  settings.EXCHANGE_GANETI, DB_HANDLER_KEY_OP,      'update_db'),
186 9068cd85 Georgios Gousios
    (QUEUE_GANETI_EVENTS_NET, settings.EXCHANGE_GANETI, DB_HANDLER_KEY_NET,     'update_net'),
187 9068cd85 Georgios Gousios
    (QUEUE_GANETI_BUILD_PROGR,settings.EXCHANGE_GANETI, BUILD_MONITOR_HANDLER,  'update_build_progress'),
188 9068cd85 Georgios Gousios
    (QUEUE_CRON_CREDITS,      settings.EXCHANGE_CRON,   '*.credits.*',          'update_credits'),
189 633d9cfa Georgios Gousios
    (QUEUE_EMAIL,             settings.EXCHANGE_API,    EMAIL_HANDLER,          'send_email'),
190 633d9cfa Georgios Gousios
    (QUEUE_EMAIL,             settings.EXCHANGE_CRON,   EMAIL_HANDLER,          'send_email'),
191 633d9cfa Georgios Gousios
    (QUEUE_RECONC,            settings.EXCHANGE_CRON,   RECONC_HANDLER,         'trigger_status_update'),
192 698d0666 Georgios Gousios
    ]
193 698d0666 Georgios Gousios
194 698d0666 Georgios Gousios
    if settings.DEBUG is True:
195 698d0666 Georgios Gousios
        BINDINGS += [
196 698d0666 Georgios Gousios
            # Queue       # Exchange          # RouteKey  # Handler
197 698d0666 Georgios Gousios
            (QUEUE_DEBUG, settings.EXCHANGE_GANETI, '#',  'dummy_proc'),
198 698d0666 Georgios Gousios
            (QUEUE_DEBUG, settings.EXCHANGE_CRON,   '#',  'dummy_proc'),
199 698d0666 Georgios Gousios
            (QUEUE_DEBUG, settings.EXCHANGE_API,    '#',  'dummy_proc'),
200 698d0666 Georgios Gousios
        ]
201 698d0666 Georgios Gousios
        QUEUES += (QUEUE_DEBUG,)
202 698d0666 Georgios Gousios
203 698d0666 Georgios Gousios
204 c183005e Georgios Gousios
def _exit_handler(signum, frame):
205 d28244af Vangelis Koukis
    """"Catch exit signal in children processes"""
206 9e98ba3c Giorgos Verigakis
    log.info("Caught signal %d, will raise SystemExit", signum)
207 8d8ea051 Georgios Gousios
    raise SystemExit
208 8d8ea051 Georgios Gousios
209 c183005e Georgios Gousios
210 c183005e Georgios Gousios
def _parent_handler(signum, frame):
211 c183005e Georgios Gousios
    """"Catch exit signal in parent process and forward it to children."""
212 9e98ba3c Giorgos Verigakis
    global children
213 9e98ba3c Giorgos Verigakis
    log.info("Caught signal %d, sending SIGTERM to children %s",
214 d28244af Vangelis Koukis
                signum, children)
215 8861126f Georgios Gousios
    [os.kill(pid, SIGTERM) for pid in children]
216 8861126f Georgios Gousios
217 c183005e Georgios Gousios
218 57d0082a Georgios Gousios
def child(cmdline):
219 c183005e Georgios Gousios
    """The context of the child process"""
220 c183005e Georgios Gousios
221 c183005e Georgios Gousios
    # Cmd line argument parsing
222 78e2d194 Georgios Gousios
    (opts, args) = parse_arguments(cmdline)
223 d28244af Vangelis Koukis
    disp = Dispatcher(debug=opts.debug)
224 78e2d194 Georgios Gousios
225 c183005e Georgios Gousios
    # Start the event loop
226 2cd99e7a Georgios Gousios
    disp.wait()
227 78e2d194 Georgios Gousios
228 c183005e Georgios Gousios
229 78e2d194 Georgios Gousios
def parse_arguments(args):
230 78e2d194 Georgios Gousios
    from optparse import OptionParser
231 78e2d194 Georgios Gousios
232 3d975c75 Kostas Papadimitriou
    default_pid_file = os.path.join("var","run","synnefo","dispatcher.pid")
233 78e2d194 Georgios Gousios
    parser = OptionParser()
234 c183005e Georgios Gousios
    parser.add_option("-d", "--debug", action="store_true", default=False,
235 2cd99e7a Georgios Gousios
                      dest="debug", help="Enable debug mode")
236 e6209aa2 Georgios Gousios
    parser.add_option("-w", "--workers", default=2, dest="workers",
237 e6209aa2 Georgios Gousios
                      help="Number of workers to spawn", type="int")
238 3d975c75 Kostas Papadimitriou
    parser.add_option("-p", "--pid-file", dest="pid_file",
239 3d975c75 Kostas Papadimitriou
                      default=default_pid_file,
240 3d975c75 Kostas Papadimitriou
                      help="Save PID to file (default: %s)" % default_pid_file)
241 979482ce Georgios Gousios
    parser.add_option("--purge-queues", action="store_true",
242 979482ce Georgios Gousios
                      default=False, dest="purge_queues",
243 57d0082a Georgios Gousios
                      help="Remove all declared queues (DANGEROUS!)")
244 979482ce Georgios Gousios
    parser.add_option("--purge-exchanges", action="store_true",
245 979482ce Georgios Gousios
                      default=False, dest="purge_exchanges",
246 979482ce Georgios Gousios
                      help="Remove all exchanges. Implies deleting all queues \
247 979482ce Georgios Gousios
                           first (DANGEROUS!)")
248 d5470cdd Georgios Gousios
    parser.add_option("--drain-queue", dest="drain_queue",
249 e6209aa2 Georgios Gousios
                      help="Strips a queue from all outstanding messages")
250 de081774 Georgios Gousios
251 78e2d194 Georgios Gousios
    return parser.parse_args(args)
252 78e2d194 Georgios Gousios
253 f30730c0 Georgios Gousios
254 d28244af Vangelis Koukis
def purge_queues():
255 979482ce Georgios Gousios
    """
256 979482ce Georgios Gousios
        Delete declared queues from RabbitMQ. Use with care!
257 979482ce Georgios Gousios
    """
258 698d0666 Georgios Gousios
    global QUEUES, BINDINGS
259 979482ce Georgios Gousios
    conn = get_connection()
260 f30730c0 Georgios Gousios
    chan = conn.channel()
261 f30730c0 Georgios Gousios
262 698d0666 Georgios Gousios
    print "Queues to be deleted: ", QUEUES
263 f30730c0 Georgios Gousios
264 979482ce Georgios Gousios
    if not get_user_confirmation():
265 f30730c0 Georgios Gousios
        return
266 f30730c0 Georgios Gousios
267 698d0666 Georgios Gousios
    for queue in QUEUES:
268 f30730c0 Georgios Gousios
        try:
269 f30730c0 Georgios Gousios
            chan.queue_delete(queue=queue)
270 979482ce Georgios Gousios
            print "Deleting queue %s" % queue
271 979482ce Georgios Gousios
        except amqp.exceptions.AMQPChannelException as e:
272 979482ce Georgios Gousios
            print e.amqp_reply_code, " ", e.amqp_reply_text
273 979482ce Georgios Gousios
            chan = conn.channel()
274 979482ce Georgios Gousios
275 979482ce Georgios Gousios
    chan.connection.close()
276 979482ce Georgios Gousios
277 979482ce Georgios Gousios
278 979482ce Georgios Gousios
def purge_exchanges():
279 e6f5bb10 Vangelis Koukis
    """Delete declared exchanges from RabbitMQ, after removing all queues"""
280 698d0666 Georgios Gousios
    global QUEUES, BINDINGS
281 979482ce Georgios Gousios
    purge_queues()
282 979482ce Georgios Gousios
283 979482ce Georgios Gousios
    conn = get_connection()
284 979482ce Georgios Gousios
    chan = conn.channel()
285 979482ce Georgios Gousios
286 e6f5bb10 Vangelis Koukis
    print "Exchanges to be deleted: ", settings.EXCHANGES
287 979482ce Georgios Gousios
288 979482ce Georgios Gousios
    if not get_user_confirmation():
289 979482ce Georgios Gousios
        return
290 979482ce Georgios Gousios
291 979482ce Georgios Gousios
    for exchange in settings.EXCHANGES:
292 979482ce Georgios Gousios
        try:
293 979482ce Georgios Gousios
            chan.exchange_delete(exchange=exchange)
294 f30730c0 Georgios Gousios
        except amqp.exceptions.AMQPChannelException as e:
295 f30730c0 Georgios Gousios
            print e.amqp_reply_code, " ", e.amqp_reply_text
296 979482ce Georgios Gousios
297 8861126f Georgios Gousios
    chan.connection.close()
298 f30730c0 Georgios Gousios
299 c183005e Georgios Gousios
300 979482ce Georgios Gousios
def drain_queue(queue):
301 e6f5bb10 Vangelis Koukis
    """Strip a (declared) queue from all outstanding messages"""
302 698d0666 Georgios Gousios
    global QUEUES, BINDINGS
303 979482ce Georgios Gousios
    if not queue:
304 979482ce Georgios Gousios
        return
305 979482ce Georgios Gousios
306 698d0666 Georgios Gousios
    if not queue in QUEUES:
307 979482ce Georgios Gousios
        print "Queue %s not configured" % queue
308 979482ce Georgios Gousios
        return
309 979482ce Georgios Gousios
310 979482ce Georgios Gousios
    print "Queue to be drained: %s" % queue
311 979482ce Georgios Gousios
312 979482ce Georgios Gousios
    if not get_user_confirmation():
313 979482ce Georgios Gousios
        return
314 979482ce Georgios Gousios
    conn = get_connection()
315 979482ce Georgios Gousios
    chan = conn.channel()
316 979482ce Georgios Gousios
317 e6209aa2 Georgios Gousios
    # Register a temporary queue binding
318 698d0666 Georgios Gousios
    for binding in BINDINGS:
319 e6209aa2 Georgios Gousios
        if binding[0] == queue:
320 e6209aa2 Georgios Gousios
            exch = binding[1]
321 e6209aa2 Georgios Gousios
322 e6209aa2 Georgios Gousios
    if not exch:
323 e6209aa2 Georgios Gousios
        print "Queue not bound to any exchange: %s" % queue
324 e6209aa2 Georgios Gousios
        return
325 e6209aa2 Georgios Gousios
326 2bf8d695 Vangelis Koukis
    chan.queue_bind(queue=queue, exchange=exch, routing_key='#')
327 e6209aa2 Georgios Gousios
    tag = chan.basic_consume(queue=queue, callback=callbacks.dummy_proc)
328 e6209aa2 Georgios Gousios
329 e6209aa2 Georgios Gousios
    print "Queue draining about to start, hit Ctrl+c when done"
330 e6209aa2 Georgios Gousios
    time.sleep(2)
331 e6209aa2 Georgios Gousios
    print "Queue draining starting"
332 e6209aa2 Georgios Gousios
333 e6209aa2 Georgios Gousios
    signal(SIGTERM, _exit_handler)
334 e6209aa2 Georgios Gousios
    signal(SIGINT, _exit_handler)
335 e6209aa2 Georgios Gousios
336 c626e1d0 Georgios Gousios
    num_processed = 0
337 e6209aa2 Georgios Gousios
    while True:
338 e6209aa2 Georgios Gousios
        chan.wait()
339 c626e1d0 Georgios Gousios
        num_processed += 1
340 c626e1d0 Georgios Gousios
        sys.stderr.write("Ignored %d messages\r" % num_processed)
341 c626e1d0 Georgios Gousios
342 e6209aa2 Georgios Gousios
    chan.basic_cancel(tag)
343 979482ce Georgios Gousios
    chan.connection.close()
344 979482ce Georgios Gousios
345 95aee02c Vangelis Koukis
346 979482ce Georgios Gousios
def get_connection():
347 e6f5bb10 Vangelis Koukis
    conn = amqp.Connection(host=settings.RABBIT_HOST,
348 e6f5bb10 Vangelis Koukis
                           userid=settings.RABBIT_USERNAME,
349 e6f5bb10 Vangelis Koukis
                           password=settings.RABBIT_PASSWORD,
350 e6f5bb10 Vangelis Koukis
                           virtual_host=settings.RABBIT_VHOST)
351 979482ce Georgios Gousios
    return conn
352 979482ce Georgios Gousios
353 95aee02c Vangelis Koukis
354 979482ce Georgios Gousios
def get_user_confirmation():
355 979482ce Georgios Gousios
    ans = raw_input("Are you sure (N/y):")
356 979482ce Georgios Gousios
357 979482ce Georgios Gousios
    if not ans:
358 979482ce Georgios Gousios
        return False
359 979482ce Georgios Gousios
    if ans not in ['Y', 'y']:
360 979482ce Georgios Gousios
        return False
361 979482ce Georgios Gousios
    return True
362 979482ce Georgios Gousios
363 979482ce Georgios Gousios
364 57d0082a Georgios Gousios
def debug_mode():
365 2bf8d695 Vangelis Koukis
    disp = Dispatcher(debug=True)
366 838239fa Georgios Gousios
    signal(SIGINT, _exit_handler)
367 838239fa Georgios Gousios
    signal(SIGTERM, _exit_handler)
368 838239fa Georgios Gousios
369 838239fa Georgios Gousios
    disp.wait()
370 838239fa Georgios Gousios
371 838239fa Georgios Gousios
372 d28244af Vangelis Koukis
def daemon_mode(opts):
373 9e98ba3c Giorgos Verigakis
    global children
374 698d0666 Georgios Gousios
375 d28244af Vangelis Koukis
    # Create pidfile,
376 d28244af Vangelis Koukis
    # take care of differences between python-daemon versions
377 4ed2e471 Georgios Gousios
    try:
378 4ed2e471 Georgios Gousios
        pidf = pidfile.TimeoutPIDLockFile(opts.pid_file, 10)
379 4ed2e471 Georgios Gousios
    except:
380 4ed2e471 Georgios Gousios
        pidf = pidlockfile.TimeoutPIDLockFile(opts.pid_file, 10)
381 4ed2e471 Georgios Gousios
382 de081774 Georgios Gousios
    pidf.acquire()
383 de081774 Georgios Gousios
384 9e98ba3c Giorgos Verigakis
    log.info("Became a daemon")
385 57d0082a Georgios Gousios
386 c183005e Georgios Gousios
    # Fork workers
387 8861126f Georgios Gousios
    children = []
388 8861126f Georgios Gousios
389 8861126f Georgios Gousios
    i = 0
390 8861126f Georgios Gousios
    while i < opts.workers:
391 8861126f Georgios Gousios
        newpid = os.fork()
392 8861126f Georgios Gousios
393 8861126f Georgios Gousios
        if newpid == 0:
394 d28244af Vangelis Koukis
            signal(SIGINT, _exit_handler)
395 c183005e Georgios Gousios
            signal(SIGTERM, _exit_handler)
396 57d0082a Georgios Gousios
            child(sys.argv[1:])
397 4dc0b46a Georgios Gousios
            sys.exit(1)
398 8861126f Georgios Gousios
        else:
399 9e98ba3c Giorgos Verigakis
            log.debug("%d, forked child: %d", os.getpid(), newpid)
400 9e98ba3c Giorgos Verigakis
            children.append(newpid)
401 8861126f Georgios Gousios
        i += 1
402 8861126f Georgios Gousios
403 8d8ea051 Georgios Gousios
    # Catch signals to ensure graceful shutdown
404 d28244af Vangelis Koukis
    signal(SIGINT, _parent_handler)
405 c183005e Georgios Gousios
    signal(SIGTERM, _parent_handler)
406 8861126f Georgios Gousios
407 57d0082a Georgios Gousios
    # Wait for all children processes to die, one by one
408 d28244af Vangelis Koukis
    try:
409 de081774 Georgios Gousios
        for pid in children:
410 de081774 Georgios Gousios
            try:
411 de081774 Georgios Gousios
                os.waitpid(pid, 0)
412 de081774 Georgios Gousios
            except Exception:
413 de081774 Georgios Gousios
                pass
414 de081774 Georgios Gousios
    finally:
415 de081774 Georgios Gousios
        pidf.release()
416 c183005e Georgios Gousios
417 d28244af Vangelis Koukis
418 d28244af Vangelis Koukis
def main():
419 3d975c75 Kostas Papadimitriou
    dictConfig(settings.DISPATCHER_LOGGING)
420 3d975c75 Kostas Papadimitriou
421 9e98ba3c Giorgos Verigakis
    global log
422 3d975c75 Kostas Papadimitriou
423 d28244af Vangelis Koukis
    (opts, args) = parse_arguments(sys.argv[1:])
424 d28244af Vangelis Koukis
425 d28244af Vangelis Koukis
    # Init the global variables containing the queues
426 d28244af Vangelis Koukis
    _init_queues()
427 d28244af Vangelis Koukis
428 d28244af Vangelis Koukis
    # Special case for the clean up queues action
429 d28244af Vangelis Koukis
    if opts.purge_queues:
430 d28244af Vangelis Koukis
        purge_queues()
431 d28244af Vangelis Koukis
        return
432 d28244af Vangelis Koukis
433 d28244af Vangelis Koukis
    # Special case for the clean up exch action
434 d28244af Vangelis Koukis
    if opts.purge_exchanges:
435 d28244af Vangelis Koukis
        purge_exchanges()
436 d28244af Vangelis Koukis
        return
437 d28244af Vangelis Koukis
438 d28244af Vangelis Koukis
    if opts.drain_queue:
439 d28244af Vangelis Koukis
        drain_queue(opts.drain_queue)
440 d28244af Vangelis Koukis
        return
441 d28244af Vangelis Koukis
442 d28244af Vangelis Koukis
    # Debug mode, process messages without spawning workers
443 d28244af Vangelis Koukis
    if opts.debug:
444 d28244af Vangelis Koukis
        debug_mode()
445 d28244af Vangelis Koukis
        return
446 7c62bd54 Kostas Papadimitriou
447 9e98ba3c Giorgos Verigakis
    files_preserve = []
448 9e98ba3c Giorgos Verigakis
    for handler in log.handlers:
449 9e98ba3c Giorgos Verigakis
        stream = getattr(handler, 'stream')
450 9e98ba3c Giorgos Verigakis
        if stream and hasattr(stream, 'fileno'):
451 9e98ba3c Giorgos Verigakis
            files_preserve.append(handler.stream)
452 7c62bd54 Kostas Papadimitriou
453 d28244af Vangelis Koukis
    daemon_context = daemon.DaemonContext(
454 d28244af Vangelis Koukis
        files_preserve=files_preserve,
455 d28244af Vangelis Koukis
        umask=022)
456 7c62bd54 Kostas Papadimitriou
457 d28244af Vangelis Koukis
    daemon_context.open()
458 d28244af Vangelis Koukis
459 d28244af Vangelis Koukis
    # Catch every exception, make sure it gets logged properly
460 d28244af Vangelis Koukis
    try:
461 d28244af Vangelis Koukis
        daemon_mode(opts)
462 d28244af Vangelis Koukis
    except Exception:
463 9e98ba3c Giorgos Verigakis
        log.exception("Unknown error")
464 d28244af Vangelis Koukis
        raise
465 d28244af Vangelis Koukis
466 d28244af Vangelis Koukis
467 7c62bd54 Kostas Papadimitriou
if __name__ == "__main__":
468 3d975c75 Kostas Papadimitriou
    sys.exit(main())
469 7c62bd54 Kostas Papadimitriou
470 8d8ea051 Georgios Gousios
# vim: set sta sts=4 shiftwidth=4 sw=4 et ai :