Statistics
| Branch: | Tag: | Revision:

root / logic / dispatcher.py @ 4ed2e471

History | View | Annotate | Download (9.6 kB)

1
#!/usr/bin/env python
2
# Copyright 2011 GRNET S.A. All rights reserved.
3
#
4
# Redistribution and use in source and binary forms, with or without
5
# modification, are permitted provided that the following conditions
6
# are met:
7
#
8
#   1. Redistributions of source code must retain the above copyright
9
#      notice, this list of conditions and the following disclaimer.
10
#
11
#  2. Redistributions in binary form must reproduce the above copyright
12
#     notice, this list of conditions and the following disclaimer in the
13
#     documentation and/or other materials provided with the distribution.
14
#
15
# THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
16
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18
# ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
19
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
20
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
21
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
22
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
23
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
24
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
25
# SUCH DAMAGE.
26
#
27
# The views and conclusions contained in the software and documentation are
28
# those of the authors and should not be interpreted as representing official
29
# policies, either expressed or implied, of GRNET S.A.
30

    
31

    
32
""" Message queue setup and dispatch
33

34
This program sets up connections to the queues configured in settings.py
35
and implements the message wait and dispatch loops. Actual messages are
36
handled in the dispatched functions.
37

38
"""
39

    
40
from django.core.management import setup_environ
41

    
42
import sys
43
import os
44
path = os.path.normpath(os.path.join(os.getcwd(), '..'))
45
sys.path.append(path)
46
import synnefo.settings as settings
47
from synnefo.logic import log
48

    
49
setup_environ(settings)
50

    
51
from amqplib import client_0_8 as amqp
52
from signal import signal, SIGINT, SIGTERM
53

    
54
import time
55
import socket
56
from daemon import daemon
57

    
58
# Take care of differences between python-daemon versions.
59
try:
60
    from daemon import pidfile
61
except:
62
    from daemon import pidlockfile
63

    
64
from synnefo.logic import callbacks
65

    
66
class Dispatcher:
67

    
68
    logger = None
69
    chan = None
70
    debug = False
71
    clienttags = []
72

    
73
    def __init__(self, debug = False):
74
        
75
        # Initialize logger
76
        self.logger = log.get_logger('synnefo.dispatcher')
77

    
78
        self.debug = debug
79
        self._init()
80

    
81
    def wait(self):
82
        while True:
83
            try:
84
                self.chan.wait()
85
            except SystemExit:
86
                break
87
            except amqp.exceptions.AMQPConnectionException:
88
                self.logger.error("Server went away, reconnecting...")
89
                self._init()
90
            except socket.error:
91
                self.logger.error("Server went away, reconnecting...")
92
                self._init()
93

    
94
        [self.chan.basic_cancel(clienttag) for clienttag in self.clienttags]
95
        self.chan.connection.close()
96
        self.chan.close()
97

    
98
    def _init(self):
99
        self.logger.info("Initializing")
100
        
101
        # Connect to RabbitMQ
102
        conn = None
103
        while conn == None:
104
            self.logger.info("Attempting to connect to %s",
105
                             settings.RABBIT_HOST)
106
            try:
107
                conn = amqp.Connection(host=settings.RABBIT_HOST,
108
                                       userid=settings.RABBIT_USERNAME,
109
                                       password=settings.RABBIT_PASSWORD,
110
                                       virtual_host=settings.RABBIT_VHOST)
111
            except socket.error:
112
                time.sleep(1)
113

    
114
        self.logger.info("Connection succesful, opening channel")
115
        self.chan = conn.channel()
116

    
117
        # Declare queues and exchanges
118
        for exchange in settings.EXCHANGES:
119
            self.chan.exchange_declare(exchange=exchange, type="topic",
120
                                       durable=True, auto_delete=False)
121

    
122
        for queue in settings.QUEUES:
123
            self.chan.queue_declare(queue=queue, durable=True,
124
                                    exclusive=False, auto_delete=False)
125

    
126
        bindings = settings.BINDINGS
127

    
128
        # Special queue for debugging, should not appear in production
129
        if self.debug:
130
            self.chan.queue_declare(queue=settings.QUEUE_DEBUG, durable=True,
131
                                    exclusive=False, auto_delete=False)
132
            bindings += settings.BINDINGS_DEBUG
133

    
134
        # Bind queues to handler methods
135
        for binding in bindings:
136
            try:
137
                callback = getattr(callbacks, binding[3])
138
            except AttributeError:
139
                self.logger.error("Cannot find callback %s" % binding[3])
140
                continue
141

    
142
            self.chan.queue_bind(queue=binding[0], exchange=binding[1],
143
                                 routing_key=binding[2])
144
            tag = self.chan.basic_consume(queue=binding[0], callback=callback)
145
            self.logger.debug("Binding %s(%s) to queue %s with handler %s" %
146
                              (binding[1], binding[2], binding[0], binding[3]))
147
            self.clienttags.append(tag)
148

    
149

    
150
def _exit_handler(signum, frame):
151
    """"Catch exit signal in children processes."""
152
    print "%d: Caught signal %d, will raise SystemExit" % (os.getpid(), signum)
153
    raise SystemExit
154

    
155

    
156
def _parent_handler(signum, frame):
157
    """"Catch exit signal in parent process and forward it to children."""
158
    global children
159
    print "Caught signal %d, sending kill signal to children" % signum
160
    [os.kill(pid, SIGTERM) for pid in children]
161

    
162

    
163
def child(cmdline):
164
    """The context of the child process"""
165

    
166
    # Cmd line argument parsing
167
    (opts, args) = parse_arguments(cmdline)
168
    disp = Dispatcher(debug = opts.debug)
169

    
170
    # Start the event loop
171
    disp.wait()
172

    
173

    
174
def parse_arguments(args):
175
    from optparse import OptionParser
176

    
177
    parser = OptionParser()
178
    parser.add_option("-d", "--debug", action="store_true", default=False,
179
                      dest="debug", help="Enable debug mode")
180
    parser.add_option("-c", "--cleanup-queues", action="store_true",
181
                      default=False, dest="cleanup_queues",
182
                      help="Remove all declared queues (DANGEROUS!)")
183
    parser.add_option("-w", "--workers", default=2, dest="workers",
184
                      help="Number of workers to spawn", type="int")
185
    parser.add_option("-p", '--pid-file', dest="pid_file",
186
                      default=os.path.join(os.getcwd(), "dispatcher.pid"),
187
                      help="Save PID to file (default:%s)" %
188
                           os.path.join(os.getcwd(), "dispatcher.pid"))
189

    
190
    return parser.parse_args(args)
191

    
192

    
193
def cleanup_queues() :
194
    """Delete declared queues from RabbitMQ. Use with care!"""
195
    conn = amqp.Connection( host=settings.RABBIT_HOST,
196
                            userid=settings.RABBIT_USERNAME,
197
                            password=settings.RABBIT_PASSWORD,
198
                            virtual_host=settings.RABBIT_VHOST)
199
    chan = conn.channel()
200

    
201
    print "Queues to be deleted: ",  settings.QUEUES
202
    print "Exchnages to be deleted: ", settings.EXCHANGES
203
    ans = raw_input("Are you sure (N/y):")
204

    
205
    if not ans:
206
        return
207
    if ans not in ['Y', 'y']:
208
        return
209

    
210
    #for exchange in settings.EXCHANGES:
211
    #    try:
212
    #        chan.exchange_delete(exchange=exchange)
213
    #    except amqp.exceptions.AMQPChannelException as e:
214
    #        print e.amqp_reply_code, " ", e.amqp_reply_text
215

    
216
    for queue in settings.QUEUES:
217
        try:
218
            chan.queue_delete(queue=queue)
219
        except amqp.exceptions.AMQPChannelException as e:
220
            print e.amqp_reply_code, " ", e.amqp_reply_text
221
    chan.close()
222
    chan.connection.close()
223

    
224

    
225
def debug_mode():
226
    disp = Dispatcher(debug = True)
227
    signal(SIGINT, _exit_handler)
228
    signal(SIGTERM, _exit_handler)
229

    
230
    disp.wait()
231

    
232

    
233
def main():
234
    global children, logger
235
    (opts, args) = parse_arguments(sys.argv[1:])
236

    
237
    logger = log.get_logger("synnefo.dispatcher")
238

    
239
    # Special case for the clean up queues action
240
    if opts.cleanup_queues:
241
        cleanup_queues()
242
        return
243

    
244
    # Debug mode, process messages without spawning workers
245
    if opts.debug:
246
        debug_mode()
247
        return
248

    
249
    # Become a daemon
250
    daemon_context = daemon.DaemonContext(
251
        stdout=sys.stdout,
252
        stderr=sys.stderr,
253
        umask=022)
254

    
255
    daemon_context.open()
256

    
257
    # Create pidfile. Take care of differences between python-daemon versions.
258
    try:
259
        pidf = pidfile.TimeoutPIDLockFile(opts.pid_file, 10)
260
    except:
261
        pidf = pidlockfile.TimeoutPIDLockFile(opts.pid_file, 10)
262

    
263
    pidf.acquire()
264

    
265
    logger.info("Became a daemon")
266

    
267
    # Fork workers
268
    children = []
269

    
270
    i = 0
271
    while i < opts.workers:
272
        newpid = os.fork()
273

    
274
        if newpid == 0:
275
            signal(SIGINT,  _exit_handler)
276
            signal(SIGTERM, _exit_handler)
277
            child(sys.argv[1:])
278
            sys.exit(1)
279
        else:
280
            pids = (os.getpid(), newpid)
281
            logger.debug("%d, forked child: %d" % pids)
282
            children.append(pids[1])
283
        i += 1
284

    
285
    # Catch signals to ensure graceful shutdown
286
    signal(SIGINT,  _parent_handler)
287
    signal(SIGTERM, _parent_handler)
288

    
289
    # Wait for all children processes to die, one by one
290
    try :
291
        for pid in children:
292
            try:
293
                os.waitpid(pid, 0)
294
            except Exception:
295
                pass
296
    finally:
297
        pidf.release()
298

    
299
if __name__ == "__main__":
300
    sys.exit(main())
301

    
302
# vim: set sta sts=4 shiftwidth=4 sw=4 et ai :