Statistics
| Branch: | Tag: | Revision:

root / logic / dispatcher.py @ 6e8c2217

History | View | Annotate | Download (6.5 kB)

1
#!/usr/bin/env python
2
#
3
# Copyright (c) 2010 Greek Research and Technology Network
4
#
5
"""Receive Ganeti events over RabbitMQ, update VM state in DB.
6

7
This daemon receives job notifications from ganeti-amqpd
8
and updates VM state in the DB accordingly.
9

10
"""
11

    
12
from django.core.management import setup_environ
13

    
14
import sys
15
import os
16
path = os.path.normpath(os.path.join(os.getcwd(), '..'))
17
sys.path.append(path)
18
import synnefo.settings as settings
19

    
20
setup_environ(settings)
21

    
22
from amqplib import client_0_8 as amqp
23

    
24
import daemon
25
from signal import signal, SIGINT, SIGTERM, SIGKILL
26

    
27
import logging
28
import time
29
import socket
30

    
31
from synnefo.logic import dispatcher_callbacks
32

    
33
class Dispatcher:
34

    
35
    logger = None
36
    chan = None
37
    debug = False
38
    clienttags = []
39

    
40
    def __init__(self, debug = False, logger = None):
41
        self.logger = logger
42
        self.debug = debug
43
        self._init()
44

    
45
    def wait(self):
46
        while True:
47
            try:
48
                self.chan.wait()
49
            except SystemExit:
50
                break
51
            except socket.error:
52
                self.logger.error("Server went away, reconnecting...")
53
                self._init()
54
                pass
55

    
56
        [self.chan.basic_cancel(clienttag) for clienttag in self.clienttags]
57
        self.chan.close()
58
        self.chan.connection.close()
59

    
60
    def _init(self):
61
        conn = None
62
        while conn == None:
63
            self.logger.info("Attempting to connect to %s", settings.RABBIT_HOST)
64
            try:
65
                conn = amqp.Connection( host=settings.RABBIT_HOST,
66
                                    userid=settings.RABBIT_USERNAME,
67
                                    password=settings.RABBIT_PASSWORD,
68
                                    virtual_host=settings.RABBIT_VHOST)
69
            except socket.error:
70
                time.sleep(1)
71
                pass
72

    
73
        self.logger.info("Connection succesful, opening channel")
74
        self.chan = conn.channel()
75

    
76
        #Declare queues and exchanges
77
        for exchange in settings.EXCHANGES:
78
            self.chan.exchange_declare(exchange=exchange, type="topic", durable=True, auto_delete=False)
79

    
80
        for queue in settings.QUEUES:
81
            self.chan.queue_declare(queue=queue, durable=True, exclusive=False, auto_delete=False)
82

    
83
        bindings = settings.BINDINGS
84

    
85
        if self.debug:
86
            #Special queue handling, should not appear in production
87
            self.chan.queue_declare(queue=settings.QUEUE_DEBUG, durable=True, exclusive=False, auto_delete=False)
88
            bindings += settings.BINDINGS_DEBUG
89

    
90
        #Bind queues to handler methods
91
        for binding in bindings:
92
            try:
93
                cb = getattr(dispatcher_callbacks, binding[3])
94
            except AttributeError:
95
                self.logger.error("Cannot find callback %s" % binding[3])
96

    
97
            self.chan.queue_bind(queue=binding[0], exchange=binding[1], routing_key=binding[2])
98
            tag = self.chan.basic_consume(queue=binding[0], callback=cb)
99
            self.logger.debug("Binding %s(%s) to queue %s with handler %s" %
100
                              (binding[1], binding[2], binding[0], binding[3]))
101
            self.clienttags.append(tag)
102

    
103
def exit_handler(signum, frame):
104
    global handler_logger
105

    
106
    handler_logger.info("Caught fatal signal %d, will raise SystemExit", signum)
107
    raise SystemExit
108

    
109
def child(cmdline):
110
    global logger
111
    #Cmd line argument parsing
112
    (opts, args) = parse_arguments(cmdline)
113

    
114
    # Initialize logger
115
    lvl = logging.DEBUG if opts.debug else logging.INFO
116
    logger = logging.getLogger("okeanos.dispatcher")
117
    logger.setLevel(lvl)
118
    formatter = logging.Formatter("%(asctime)s %(module)s[%(process)d] %(levelname)s: %(message)s",
119
            "%Y-%m-%d %H:%M:%S")
120
    handler = logging.FileHandler(opts.log_file)
121
    handler.setFormatter(formatter)
122
    logger.addHandler(handler)
123

    
124
    d = Dispatcher(debug = opts.debug, logger = logger)
125

    
126
    d.wait()
127

    
128
def parse_arguments(args):
129
    from optparse import OptionParser
130

    
131
    parser = OptionParser()
132
    parser.add_option("-d", "--debug", action="store_true", default=False, dest="debug",
133
            help="Enable debug mode")
134
    parser.add_option("-l", "--log", dest="log_file",
135
            default=settings.DISPATCHER_LOG_FILE,
136
            metavar="FILE",
137
            help="Write log to FILE instead of %s" %
138
            settings.DISPATCHER_LOG_FILE)
139
    parser.add_option("-c", "--cleanup-queues", action="store_true", default=False, dest="cleanup_queues",
140
            help="Remove from RabbitMQ all queues declared in settings.py (DANGEROUS!)")
141
    
142
    return parser.parse_args(args)
143

    
144
def cleanup_queues() :
145

    
146
    conn = amqp.Connection( host=settings.RABBIT_HOST,
147
                            userid=settings.RABBIT_USERNAME,
148
                            password=settings.RABBIT_PASSWORD,
149
                            virtual_host=settings.RABBIT_VHOST)
150
    chan = conn.channel()
151

    
152
    print "Queues to be deleted: ",  settings.QUEUES
153
    print "Exchnages to be deleted: ", settings.EXCHANGES
154
    ans = raw_input("Are you sure (N/y):")
155

    
156
    if not ans:
157
        return
158
    if ans not in ['Y', 'y']:
159
        return
160

    
161
    for exchange in settings.EXCHANGES:
162
        try:
163
            chan.exchange_delete(exchange=exchange)
164
        except amqp.exceptions.AMQPChannelException as e:
165
            print e.amqp_reply_code, " ", e.amqp_reply_text
166

    
167
    for queue in settings.QUEUES:
168
        try:
169
            chan.queue_delete(queue=queue)
170
        except amqp.exceptions.AMQPChannelException as e:
171
            print e.amqp_reply_code, " ", e.amqp_reply_text
172

    
173
def main():
174
    (opts, args) = parse_arguments(sys.argv[1:])
175

    
176
    if opts.cleanup_queues:
177
        cleanup_queues()
178
        return
179

    
180
    #newpid = os.fork()
181
    #if newpid == 0:
182
    child(sys.argv[1:])
183
    #else:
184
    #    pids = (os.getpid(), newpid)
185
    #    print "parent: %d, child: %d" % pids
186

    
187
    # Become a daemon:
188
    # Redirect stdout and stderr to handler.stream to catch
189
    # early errors in the daemonization process [e.g., pidfile creation]
190
    # which will otherwise go to /dev/null.
191
    #daemon_context = daemon.DaemonContext(
192
    #        umask=022,
193
    #        stdout=handler.stream,
194
    #        stderr=handler.stream,
195
    #        files_preserve=[handler.stream])
196
    #daemon_context.open()
197
    #logger.info("Became a daemon")
198
    
199
    # Catch signals to ensure graceful shutdown
200
    #signal(SIGINT, exit_handler)
201
    #signal(SIGTERM, exit_handler)
202
    #signal(SIGKILL, exit_handler)
203

    
204
if __name__ == "__main__":
205
    logging.basicConfig(level=logging.DEBUG)
206
    sys.exit(main())
207

    
208
# vim: set sta sts=4 shiftwidth=4 sw=4 et ai :