Statistics
| Branch: | Tag: | Revision:

root / logic / dispatcher.py @ 8861126f

History | View | Annotate | Download (6.7 kB)

1
#!/usr/bin/env python
2
#
3
# Copyright (c) 201! Greek Research and Technology Network
4
#
5
"""Connect to a queue 
6

7
This daemon receives job notifications from ganeti-amqpd
8
and updates VM state in the DB accordingly.
9

10
"""
11

    
12
from django.core.management import setup_environ
13

    
14
import sys
15
import os
16
path = os.path.normpath(os.path.join(os.getcwd(), '..'))
17
sys.path.append(path)
18
import synnefo.settings as settings
19

    
20
setup_environ(settings)
21

    
22
from amqplib import client_0_8 as amqp
23

    
24
from signal import signal, SIGINT, SIGTERM
25

    
26
import logging
27
import time
28
import socket
29

    
30
from synnefo.logic import dispatcher_callbacks
31

    
32
#List of worker ids
33
global children
34

    
35
class Dispatcher:
36

    
37
    logger = None
38
    chan = None
39
    debug = False
40
    clienttags = []
41

    
42
    def __init__(self, debug = False, logger = None):
43
        self.logger = logger
44
        self.debug = debug
45
        self._init()
46

    
47
    def wait(self):
48
        while True:
49
            try:
50
                self.chan.wait()
51
            except SystemExit:
52
                break
53
            except socket.error:
54
                self.logger.error("Server went away, reconnecting...")
55
                self._init()
56
                pass
57

    
58
        [self.chan.basic_cancel(clienttag) for clienttag in self.clienttags]
59
        self.chan.close()
60
        self.chan.connection.close()
61
        sys.exit()
62

    
63
    def _init(self):
64
        conn = None
65
        while conn == None:
66
            self.logger.info("Attempting to connect to %s", settings.RABBIT_HOST)
67
            try:
68
                conn = amqp.Connection( host=settings.RABBIT_HOST,
69
                                    userid=settings.RABBIT_USERNAME,
70
                                    password=settings.RABBIT_PASSWORD,
71
                                    virtual_host=settings.RABBIT_VHOST)
72
            except socket.error:
73
                time.sleep(1)
74
                pass
75

    
76
        self.logger.info("Connection succesful, opening channel")
77
        self.chan = conn.channel()
78

    
79
        #Declare queues and exchanges
80
        for exchange in settings.EXCHANGES:
81
            self.chan.exchange_declare(exchange=exchange, type="topic", durable=True, auto_delete=False)
82

    
83
        for queue in settings.QUEUES:
84
            self.chan.queue_declare(queue=queue, durable=True, exclusive=False, auto_delete=False)
85

    
86
        bindings = settings.BINDINGS
87

    
88
        if self.debug:
89
            #Special queue handling, should not appear in production
90
            self.chan.queue_declare(queue=settings.QUEUE_DEBUG, durable=True, exclusive=False, auto_delete=False)
91
            bindings += settings.BINDINGS_DEBUG
92

    
93
        #Bind queues to handler methods
94
        for binding in bindings:
95
            try:
96
                cb = getattr(dispatcher_callbacks, binding[3])
97
            except AttributeError:
98
                self.logger.error("Cannot find callback %s" % binding[3])
99

    
100
            self.chan.queue_bind(queue=binding[0], exchange=binding[1], routing_key=binding[2])
101
            tag = self.chan.basic_consume(queue=binding[0], callback=cb)
102
            self.logger.debug("Binding %s(%s) to queue %s with handler %s" %
103
                              (binding[1], binding[2], binding[0], binding[3]))
104
            self.clienttags.append(tag)
105

    
106
def exit_handler(signum, frame):
107
    print "%d: Caught signal %d, will raise SystemExit" % (os.getpid(),signum)
108
    raise SystemExit
109

    
110
def parent_handler(signum, frame):
111
    global children
112
    print "Caught signal %d, sending kill signal to children" % signum
113
    [os.kill(pid, SIGTERM) for pid in children]
114

    
115
def child(cmdline, logger):
116
    #Cmd line argument parsing
117
    (opts, args) = parse_arguments(cmdline)
118
    d = Dispatcher(debug = opts.debug, logger = logger)
119

    
120
    d.wait()
121

    
122
def parse_arguments(args):
123
    from optparse import OptionParser
124

    
125
    parser = OptionParser()
126
    parser.add_option("-d", "--debug", action="store_true", default=False, dest="debug",
127
            help="Enable debug mode")
128
    parser.add_option("-l", "--log", dest="log_file",
129
            default=settings.DISPATCHER_LOG_FILE,
130
            metavar="FILE",
131
            help="Write log to FILE instead of %s" %
132
            settings.DISPATCHER_LOG_FILE)
133
    parser.add_option("-c", "--cleanup-queues", action="store_true", default=False, dest="cleanup_queues",
134
            help="Remove from RabbitMQ all queues declared in settings.py (DANGEROUS!)")
135
    parser.add_option("-w", "--workers", default=1, dest="workers",
136
            help="Number of workers to spawn")
137
    
138
    return parser.parse_args(args)
139

    
140
def cleanup_queues() :
141

    
142
    conn = amqp.Connection( host=settings.RABBIT_HOST,
143
                            userid=settings.RABBIT_USERNAME,
144
                            password=settings.RABBIT_PASSWORD,
145
                            virtual_host=settings.RABBIT_VHOST)
146
    chan = conn.channel()
147

    
148
    print "Queues to be deleted: ",  settings.QUEUES
149
    print "Exchnages to be deleted: ", settings.EXCHANGES
150
    ans = raw_input("Are you sure (N/y):")
151

    
152
    if not ans:
153
        return
154
    if ans not in ['Y', 'y']:
155
        return
156

    
157
    for exchange in settings.EXCHANGES:
158
        try:
159
            chan.exchange_delete(exchange=exchange)
160
        except amqp.exceptions.AMQPChannelException as e:
161
            print e.amqp_reply_code, " ", e.amqp_reply_text
162

    
163
    for queue in settings.QUEUES:
164
        try:
165
            chan.queue_delete(queue=queue)
166
        except amqp.exceptions.AMQPChannelException as e:
167
            print e.amqp_reply_code, " ", e.amqp_reply_text
168
    chan.close()
169
    chan.connection.close()
170

    
171
def main():
172
    global children, logger
173
    (opts, args) = parse_arguments(sys.argv[1:])
174

    
175
    # Initialize logger
176
    lvl = logging.DEBUG if opts.debug else logging.INFO
177
    logger = logging.getLogger("synnefo.dispatcher")
178
    logger.setLevel(lvl)
179
    formatter = logging.Formatter("%(asctime)s %(module)s[%(process)d] %(levelname)s: %(message)s",
180
            "%Y-%m-%d %H:%M:%S")
181
    handler = logging.FileHandler(opts.log_file)
182
    handler.setFormatter(formatter)
183
    logger.addHandler(handler)
184

    
185
    #Special case for the clean up queues action
186
    if opts.cleanup_queues:
187
        cleanup_queues()
188
        return
189

    
190
    #Fork workers
191
    children = []
192

    
193
    i = 0
194
    while i < opts.workers:
195
        newpid = os.fork()
196

    
197
        if newpid == 0:
198
            signal(SIGINT, exit_handler)
199
            signal(SIGTERM, exit_handler)
200
            #child(sys.argv[1:], logger)
201
            time.sleep(5)
202
            sys.exit(0)
203
        else:
204
            pids = (os.getpid(), newpid)
205
            logger.debug("%d, forked child: %d" % pids)
206
            children.append(pids[1])
207
        i += 1
208

    
209
    # Catch signals to ensure graceful shutdown
210
    signal(SIGINT,  parent_handler)
211
    signal(SIGTERM, parent_handler)
212

    
213
    try:
214
        os.wait()
215
    except Exception :
216
        pass
217

    
218
if __name__ == "__main__":
219
    logging.basicConfig(level=logging.DEBUG)
220
    sys.exit(main())
221

    
222
# vim: set sta sts=4 shiftwidth=4 sw=4 et ai :