Statistics
| Branch: | Tag: | Revision:

root / logic / dispatcher.py @ 4dc0b46a

History | View | Annotate | Download (8.2 kB)

1
#!/usr/bin/env python
2
#
3
# Copyright (c) 2011 Greek Research and Technology Network
4
#
5
""" Message queue setup and dispatch
6

7
This program sets up connections to the queues configured in settings.py
8
and implements the message wait and dispatch loops. Actual messages are
9
handled in the dispatched functions.
10

11
"""
12

    
13
from django.core.management import setup_environ
14

    
15
import sys
16
import os
17
path = os.path.normpath(os.path.join(os.getcwd(), '..'))
18
sys.path.append(path)
19
import synnefo.settings as settings
20

    
21
setup_environ(settings)
22

    
23
from amqplib import client_0_8 as amqp
24
from signal import signal, SIGINT, SIGTERM
25

    
26
import logging
27
import logging.config
28
import time
29
import socket
30
from daemon import pidfile, daemon
31
import lockfile.pidlockfile
32

    
33
from synnefo.logic import dispatcher_callbacks
34

    
35

    
36
class Dispatcher:
37

    
38
    logger = None
39
    chan = None
40
    debug = False
41
    clienttags = []
42

    
43
    def __init__(self, debug = False):
44
        # Initialize logger
45
        logging.config.fileConfig("/Volumes/Files/Developer/grnet/synnefo/logging.conf")
46
        self.logger = logging.getLogger("synnefo.dispatcher")
47

    
48
        self.debug = debug
49
        self._init()
50

    
51
    def wait(self):
52
        while True:
53
            try:
54
                self.chan.wait()
55
            except SystemExit:
56
                break
57
            except amqp.exceptions.AMQPConnectionException:
58
                self.logger.error("Server went away, reconnecting...")
59
                self._init()
60
            except socket.error:
61
                self.logger.error("Server went away, reconnecting...")
62
                self._init()
63

    
64
        [self.chan.basic_cancel(clienttag) for clienttag in self.clienttags]
65
        self.chan.connection.close()
66
        self.chan.close()
67

    
68
    def _init(self):
69
        self.logger.info("Initializing")
70
        
71
        # Connect to RabbitMQ
72
        conn = None
73
        while conn == None:
74
            self.logger.info("Attempting to connect to %s",
75
                             settings.RABBIT_HOST)
76
            try:
77
                conn = amqp.Connection(host=settings.RABBIT_HOST,
78
                                       userid=settings.RABBIT_USERNAME,
79
                                       password=settings.RABBIT_PASSWORD,
80
                                       virtual_host=settings.RABBIT_VHOST)
81
            except socket.error:
82
                time.sleep(1)
83

    
84
        self.logger.info("Connection succesful, opening channel")
85
        self.chan = conn.channel()
86

    
87
        # Declare queues and exchanges
88
        for exchange in settings.EXCHANGES:
89
            self.chan.exchange_declare(exchange=exchange, type="topic",
90
                                       durable=True, auto_delete=False)
91

    
92
        for queue in settings.QUEUES:
93
            self.chan.queue_declare(queue=queue, durable=True,
94
                                    exclusive=False, auto_delete=False)
95

    
96
        bindings = settings.BINDINGS
97

    
98
        # Special queue for debugging, should not appear in production
99
        if self.debug:
100
            self.chan.queue_declare(queue=settings.QUEUE_DEBUG, durable=True,
101
                                    exclusive=False, auto_delete=False)
102
            bindings += settings.BINDINGS_DEBUG
103

    
104
        # Bind queues to handler methods
105
        for binding in bindings:
106
            try:
107
                callback = getattr(dispatcher_callbacks, binding[3])
108
            except AttributeError:
109
                self.logger.error("Cannot find callback %s" % binding[3])
110
                continue
111

    
112
            self.chan.queue_bind(queue=binding[0], exchange=binding[1],
113
                                 routing_key=binding[2])
114
            tag = self.chan.basic_consume(queue=binding[0], callback=callback)
115
            self.logger.debug("Binding %s(%s) to queue %s with handler %s" %
116
                              (binding[1], binding[2], binding[0], binding[3]))
117
            self.clienttags.append(tag)
118

    
119

    
120
def _exit_handler(signum, frame):
121
    """"Catch exit signal in children processes."""
122
    print "%d: Caught signal %d, will raise SystemExit" % (os.getpid(), signum)
123
    raise SystemExit
124

    
125

    
126
def _parent_handler(signum, frame):
127
    """"Catch exit signal in parent process and forward it to children."""
128
    global children
129
    print "Caught signal %d, sending kill signal to children" % signum
130
    [os.kill(pid, SIGTERM) for pid in children]
131

    
132

    
133
def child(cmdline):
134
    """The context of the child process"""
135

    
136
    # Cmd line argument parsing
137
    (opts, args) = parse_arguments(cmdline)
138
    disp = Dispatcher(debug = opts.debug)
139

    
140
    # Start the event loop
141
    disp.wait()
142

    
143

    
144
def parse_arguments(args):
145
    from optparse import OptionParser
146

    
147
    parser = OptionParser()
148
    parser.add_option("-d", "--debug", action="store_true", default=False,
149
                      dest="debug", help="Enable debug mode")
150
    parser.add_option("-l", "--log", dest="log_file",
151
                      default=settings.DISPATCHER_LOG_FILE, metavar="FILE",
152
                      help="Write log to FILE instead of %s" %
153
                           settings.DISPATCHER_LOG_FILE)
154
    parser.add_option("-c", "--cleanup-queues", action="store_true",
155
                      default=False, dest="cleanup_queues",
156
                      help="Remove all declared queues (DANGEROUS!)")
157
    parser.add_option("-w", "--workers", default=2, dest="workers",
158
                      help="Number of workers to spawn", type="int")
159
    
160
    return parser.parse_args(args)
161

    
162

    
163
def cleanup_queues() :
164
    """Delete declared queues from RabbitMQ. Use with care!"""
165
    conn = amqp.Connection( host=settings.RABBIT_HOST,
166
                            userid=settings.RABBIT_USERNAME,
167
                            password=settings.RABBIT_PASSWORD,
168
                            virtual_host=settings.RABBIT_VHOST)
169
    chan = conn.channel()
170

    
171
    print "Queues to be deleted: ",  settings.QUEUES
172
    print "Exchnages to be deleted: ", settings.EXCHANGES
173
    ans = raw_input("Are you sure (N/y):")
174

    
175
    if not ans:
176
        return
177
    if ans not in ['Y', 'y']:
178
        return
179

    
180
    #for exchange in settings.EXCHANGES:
181
    #    try:
182
    #        chan.exchange_delete(exchange=exchange)
183
    #    except amqp.exceptions.AMQPChannelException as e:
184
    #        print e.amqp_reply_code, " ", e.amqp_reply_text
185

    
186
    for queue in settings.QUEUES:
187
        try:
188
            chan.queue_delete(queue=queue)
189
        except amqp.exceptions.AMQPChannelException as e:
190
            print e.amqp_reply_code, " ", e.amqp_reply_text
191
    chan.close()
192
    chan.connection.close()
193

    
194

    
195
def debug_mode():
196
    disp = Dispatcher(debug = True)
197
    signal(SIGINT, _exit_handler)
198
    signal(SIGTERM, _exit_handler)
199

    
200
    disp.wait()
201

    
202

    
203
def main():
204
    global children, logger
205
    (opts, args) = parse_arguments(sys.argv[1:])
206

    
207
    # Initialize logger
208
    logging.config.fileConfig("logging.conf")
209
    logger = logging.getLogger("synnefo.dispatcher")
210

    
211
    # Special case for the clean up queues action
212
    if opts.cleanup_queues:
213
        cleanup_queues()
214
        return
215

    
216
    # Debug mode, process messages without spawning workers
217
    if opts.debug:
218
        debug_mode()
219
        return
220

    
221
    # Create pidfile
222
    pidf = pidfile.TimeoutPIDLockFile("/Volumes/Files/Developer/grnet/synnefo/dispatcher.pid", 10)
223
    pidf.acquire()
224
    pidf.__enter__()
225

    
226
    # Become a daemon
227
    daemon_context = daemon.DaemonContext(
228
        stdout=sys.stdout,
229
        stderr=sys.stderr,
230
        umask=022)
231

    
232
    daemon_context.open()
233
    logger.info("Became a daemon")
234

    
235
    # Fork workers
236
    children = []
237

    
238
    i = 0
239
    while i < opts.workers:
240
        newpid = os.fork()
241

    
242
        if newpid == 0:
243
            signal(SIGINT,  _exit_handler)
244
            signal(SIGTERM, _exit_handler)
245
            child(sys.argv[1:])
246
            sys.exit(1)
247
        else:
248
            pids = (os.getpid(), newpid)
249
            logger.debug("%d, forked child: %d" % pids)
250
            children.append(pids[1])
251
        i += 1
252

    
253
    # Catch signals to ensure graceful shutdown
254
    signal(SIGINT,  _parent_handler)
255
    signal(SIGTERM, _parent_handler)
256

    
257
    # Wait for all children processes to die, one by one
258
    for pid in children:
259
        try:
260
            os.waitpid(pid, 0)
261
        except Exception:
262
            pass
263

    
264
    pidf.release()
265
    pidf.__exit__()
266

    
267
if __name__ == "__main__":
268
    logging.basicConfig(level=logging.DEBUG)
269
    sys.exit(main())
270

    
271
# vim: set sta sts=4 shiftwidth=4 sw=4 et ai :