Statistics
| Branch: | Tag: | Revision:

root / logic / dispatcher.py @ 48130e66

History | View | Annotate | Download (9.6 kB)

1
#!/usr/bin/env python
2
# Copyright 2011 GRNET S.A. All rights reserved.
3
#
4
# Redistribution and use in source and binary forms, with or without
5
# modification, are permitted provided that the following conditions
6
# are met:
7
#
8
#   1. Redistributions of source code must retain the above copyright
9
#      notice, this list of conditions and the following disclaimer.
10
#
11
#  2. Redistributions in binary form must reproduce the above copyright
12
#     notice, this list of conditions and the following disclaimer in the
13
#     documentation and/or other materials provided with the distribution.
14
#
15
# THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
16
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18
# ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
19
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
20
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
21
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
22
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
23
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
24
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
25
# SUCH DAMAGE.
26
#
27
# The views and conclusions contained in the software and documentation are
28
# those of the authors and should not be interpreted as representing official
29
# policies, either expressed or implied, of GRNET S.A.
30

    
31

    
32
""" Message queue setup and dispatch
33

34
This program sets up connections to the queues configured in settings.py
35
and implements the message wait and dispatch loops. Actual messages are
36
handled in the dispatched functions.
37

38
"""
39

    
40
from django.core.management import setup_environ
41

    
42
import sys
43
import os
44
path = os.path.normpath(os.path.join(os.getcwd(), '..'))
45
sys.path.append(path)
46
import synnefo.settings as settings
47

    
48
setup_environ(settings)
49

    
50
from amqplib import client_0_8 as amqp
51
from signal import signal, SIGINT, SIGTERM
52

    
53
import logging
54
import logging.config
55
import time
56
import socket
57
from daemon import pidfile, daemon
58
import lockfile.pidlockfile
59

    
60
from synnefo.logic import dispatcher_callbacks
61

    
62

    
63
class Dispatcher:
64

    
65
    logger = None
66
    chan = None
67
    debug = False
68
    clienttags = []
69

    
70
    def __init__(self, debug = False):
71
        # Initialize logger
72
        logging.config.fileConfig("/Volumes/Files/Developer/grnet/synnefo/logging.conf")
73
        self.logger = logging.getLogger("synnefo.dispatcher")
74

    
75
        self.debug = debug
76
        self._init()
77

    
78
    def wait(self):
79
        while True:
80
            try:
81
                self.chan.wait()
82
            except SystemExit:
83
                break
84
            except amqp.exceptions.AMQPConnectionException:
85
                self.logger.error("Server went away, reconnecting...")
86
                self._init()
87
            except socket.error:
88
                self.logger.error("Server went away, reconnecting...")
89
                self._init()
90

    
91
        [self.chan.basic_cancel(clienttag) for clienttag in self.clienttags]
92
        self.chan.connection.close()
93
        self.chan.close()
94

    
95
    def _init(self):
96
        self.logger.info("Initializing")
97
        
98
        # Connect to RabbitMQ
99
        conn = None
100
        while conn == None:
101
            self.logger.info("Attempting to connect to %s",
102
                             settings.RABBIT_HOST)
103
            try:
104
                conn = amqp.Connection(host=settings.RABBIT_HOST,
105
                                       userid=settings.RABBIT_USERNAME,
106
                                       password=settings.RABBIT_PASSWORD,
107
                                       virtual_host=settings.RABBIT_VHOST)
108
            except socket.error:
109
                time.sleep(1)
110

    
111
        self.logger.info("Connection succesful, opening channel")
112
        self.chan = conn.channel()
113

    
114
        # Declare queues and exchanges
115
        for exchange in settings.EXCHANGES:
116
            self.chan.exchange_declare(exchange=exchange, type="topic",
117
                                       durable=True, auto_delete=False)
118

    
119
        for queue in settings.QUEUES:
120
            self.chan.queue_declare(queue=queue, durable=True,
121
                                    exclusive=False, auto_delete=False)
122

    
123
        bindings = settings.BINDINGS
124

    
125
        # Special queue for debugging, should not appear in production
126
        if self.debug:
127
            self.chan.queue_declare(queue=settings.QUEUE_DEBUG, durable=True,
128
                                    exclusive=False, auto_delete=False)
129
            bindings += settings.BINDINGS_DEBUG
130

    
131
        # Bind queues to handler methods
132
        for binding in bindings:
133
            try:
134
                callback = getattr(dispatcher_callbacks, binding[3])
135
            except AttributeError:
136
                self.logger.error("Cannot find callback %s" % binding[3])
137
                continue
138

    
139
            self.chan.queue_bind(queue=binding[0], exchange=binding[1],
140
                                 routing_key=binding[2])
141
            tag = self.chan.basic_consume(queue=binding[0], callback=callback)
142
            self.logger.debug("Binding %s(%s) to queue %s with handler %s" %
143
                              (binding[1], binding[2], binding[0], binding[3]))
144
            self.clienttags.append(tag)
145

    
146

    
147
def _exit_handler(signum, frame):
148
    """"Catch exit signal in children processes."""
149
    print "%d: Caught signal %d, will raise SystemExit" % (os.getpid(), signum)
150
    raise SystemExit
151

    
152

    
153
def _parent_handler(signum, frame):
154
    """"Catch exit signal in parent process and forward it to children."""
155
    global children
156
    print "Caught signal %d, sending kill signal to children" % signum
157
    [os.kill(pid, SIGTERM) for pid in children]
158

    
159

    
160
def child(cmdline):
161
    """The context of the child process"""
162

    
163
    # Cmd line argument parsing
164
    (opts, args) = parse_arguments(cmdline)
165
    disp = Dispatcher(debug = opts.debug)
166

    
167
    # Start the event loop
168
    disp.wait()
169

    
170

    
171
def parse_arguments(args):
172
    from optparse import OptionParser
173

    
174
    parser = OptionParser()
175
    parser.add_option("-d", "--debug", action="store_true", default=False,
176
                      dest="debug", help="Enable debug mode")
177
    parser.add_option("-l", "--log", dest="log_file",
178
                      default=settings.DISPATCHER_LOG_FILE, metavar="FILE",
179
                      help="Write log to FILE instead of %s" %
180
                           settings.DISPATCHER_LOG_FILE)
181
    parser.add_option("-c", "--cleanup-queues", action="store_true",
182
                      default=False, dest="cleanup_queues",
183
                      help="Remove all declared queues (DANGEROUS!)")
184
    parser.add_option("-w", "--workers", default=2, dest="workers",
185
                      help="Number of workers to spawn", type="int")
186
    
187
    return parser.parse_args(args)
188

    
189

    
190
def cleanup_queues() :
191
    """Delete declared queues from RabbitMQ. Use with care!"""
192
    conn = amqp.Connection( host=settings.RABBIT_HOST,
193
                            userid=settings.RABBIT_USERNAME,
194
                            password=settings.RABBIT_PASSWORD,
195
                            virtual_host=settings.RABBIT_VHOST)
196
    chan = conn.channel()
197

    
198
    print "Queues to be deleted: ",  settings.QUEUES
199
    print "Exchnages to be deleted: ", settings.EXCHANGES
200
    ans = raw_input("Are you sure (N/y):")
201

    
202
    if not ans:
203
        return
204
    if ans not in ['Y', 'y']:
205
        return
206

    
207
    #for exchange in settings.EXCHANGES:
208
    #    try:
209
    #        chan.exchange_delete(exchange=exchange)
210
    #    except amqp.exceptions.AMQPChannelException as e:
211
    #        print e.amqp_reply_code, " ", e.amqp_reply_text
212

    
213
    for queue in settings.QUEUES:
214
        try:
215
            chan.queue_delete(queue=queue)
216
        except amqp.exceptions.AMQPChannelException as e:
217
            print e.amqp_reply_code, " ", e.amqp_reply_text
218
    chan.close()
219
    chan.connection.close()
220

    
221

    
222
def debug_mode():
223
    disp = Dispatcher(debug = True)
224
    signal(SIGINT, _exit_handler)
225
    signal(SIGTERM, _exit_handler)
226

    
227
    disp.wait()
228

    
229

    
230
def main():
231
    global children, logger
232
    (opts, args) = parse_arguments(sys.argv[1:])
233

    
234
    # Initialize logger
235
    logging.config.fileConfig("logging.conf")
236
    logger = logging.getLogger("synnefo.dispatcher")
237

    
238
    # Special case for the clean up queues action
239
    if opts.cleanup_queues:
240
        cleanup_queues()
241
        return
242

    
243
    # Debug mode, process messages without spawning workers
244
    if opts.debug:
245
        debug_mode()
246
        return
247

    
248
    # Create pidfile
249
    pidf = pidfile.TimeoutPIDLockFile("/Volumes/Files/Developer/grnet/synnefo/dispatcher.pid", 10)
250
    pidf.acquire()
251
    pidf.__enter__()
252

    
253
    # Become a daemon
254
    daemon_context = daemon.DaemonContext(
255
        stdout=sys.stdout,
256
        stderr=sys.stderr,
257
        umask=022)
258

    
259
    daemon_context.open()
260
    logger.info("Became a daemon")
261

    
262
    # Fork workers
263
    children = []
264

    
265
    i = 0
266
    while i < opts.workers:
267
        newpid = os.fork()
268

    
269
        if newpid == 0:
270
            signal(SIGINT,  _exit_handler)
271
            signal(SIGTERM, _exit_handler)
272
            child(sys.argv[1:])
273
            sys.exit(1)
274
        else:
275
            pids = (os.getpid(), newpid)
276
            logger.debug("%d, forked child: %d" % pids)
277
            children.append(pids[1])
278
        i += 1
279

    
280
    # Catch signals to ensure graceful shutdown
281
    signal(SIGINT,  _parent_handler)
282
    signal(SIGTERM, _parent_handler)
283

    
284
    # Wait for all children processes to die, one by one
285
    for pid in children:
286
        try:
287
            os.waitpid(pid, 0)
288
        except Exception:
289
            pass
290

    
291
    pidf.release()
292
    pidf.__exit__()
293

    
294
if __name__ == "__main__":
295
    logging.basicConfig(level=logging.DEBUG)
296
    sys.exit(main())
297

    
298
# vim: set sta sts=4 shiftwidth=4 sw=4 et ai :