Statistics
| Branch: | Tag: | Revision:

root / db / db_controller.py @ 5d081749

History | View | Annotate | Download (8.1 kB)

1
#!/usr/bin/env python
2
#
3
# Copyright (c) 2010 Greek Research and Technology Network
4
#
5
"""Receive Ganeti events over RabbitMQ, update VM state in DB.
6

7
This daemon receives job notifications from ganeti-amqpd
8
and updates VM state in the DB accordingly.
9

10
"""
11

    
12
from django.core.management import setup_environ
13

    
14
import sys
15
import os
16
path = os.path.normpath(os.path.join(os.getcwd(), '..'))
17
sys.path.append(path)
18
import synnefo.settings as settings
19

    
20
setup_environ(settings)
21

    
22
from amqplib import client_0_8 as amqp
23

    
24
import json
25
import logging
26
import traceback
27
import time
28
import socket
29

    
30
import daemon
31
from signal import signal, SIGINT, SIGTERM, SIGKILL
32

    
33
from synnefo.db.models import VirtualMachine
34
from synnefo.logic import utils, backend
35

    
36
class Dispatcher:
37

    
38
    logger = None
39
    chan = None
40
    debug = False
41
    clienttags = []
42

    
43
    def __init__(self, debug = False, logger = None):
44
        self.logger = logger
45
        self.debug = debug
46
        self._init()
47

    
48
    def update_db(self, message):
49
        try:
50
            msg = json.loads(message.body)
51

    
52
            if msg["type"] != "ganeti-op-status":
53
                self.logger.error("Message is of uknown type %s." % (msg["type"],))
54
                return
55

    
56
            vmid = utils.id_from_instance_name(msg["instance"])
57
            vm = VirtualMachine.objects.get(id=vmid)
58

    
59
            self.logger.debug("Processing msg: %s" % (msg,))
60
            backend.process_backend_msg(vm, msg["jobId"], msg["operation"], msg["status"], msg["logmsg"])
61
            self.logger.debug("Done processing msg for vm %s." % (msg["instance"]))
62

    
63
        except KeyError:
64
            self.logger.error("Malformed incoming JSON, missing attributes: " + message.body)
65
        except VirtualMachine.InvalidBackendIdError:
66
            self.logger.debug("Ignoring msg for unknown instance %s." % (msg["instance"],))
67
        except VirtualMachine.DoesNotExist:
68
            self.logger.error("VM for instance %s with id %d not found in DB." % (msg["instance"], vmid))
69
        except Exception as e:
70
            self.logger.error("Unexpected error:\n" + "".join(traceback.format_exception(*sys.exc_info())))
71
            return
72
        finally:
73
            message.channel.basic_ack(message.delivery_tag)
74

    
75
    def send_email(self, message):
76
        self.logger.debug("Request to send email message")
77
        message.channel.basic_ack(message.delivery_tag)
78

    
79
    def update_credits(self, message):
80
        self.logger.debug("Request to update credits")
81
        message.channel.basic_ack(message.delivery_tag)
82

    
83
    def dummy_proc(self, message):
84
        try:
85
            msg = json.loads(message.body)
86
            self.logger.debug("Msg to %s (%s) " % message.channel, msg)
87
        finally:
88
            message.channel.basic_ack(message.delivery_tag)
89

    
90
    def wait(self):
91
        while True:
92
            try:
93
                self.chan.wait()
94
            except SystemExit:
95
                break
96
            except socket.error:
97
                self.logger.error("Server went away, reconnecting...")
98
                self._init()
99
                pass
100

    
101
        [self.chan.basic_cancel(clienttag) for clienttag in self.clienttags]
102
        self.chan.close()
103
        self.chan.connection.close()
104

    
105
    def _init(self):
106
        self._open_channel()
107

    
108
        for exchange in settings.EXCHANGES:
109
            self.chan.exchange_declare(exchange=exchange, type="topic", durable=True, auto_delete=False)
110

    
111
        for queue in settings.QUEUES:
112
            self.chan.queue_declare(queue=queue, durable=True, exclusive=False, auto_delete=False)
113

    
114
        bindings = None
115

    
116
        if self.debug:
117
            #Special queue handling, should not appear in production
118
            self.chan.queue_declare(queue=settings.QUEUE_DEBUG, durable=True, exclusive=False, auto_delete=False)
119
            bindings = settings.BINDINGS_DEBUG
120
        else:
121
            bindings = settings.BINDINGS
122

    
123
        for binding in bindings:
124
            self.chan.queue_bind(queue=binding[0], exchange=binding[1], routing_key=binding[2])
125
            tag = self.chan.basic_consume(queue=binding[0], callback=binding[3])
126
            self.logger.debug("Binding %s on queue %s to %s" % (binding[2], binding[0], binding[3]))
127
            self.clienttags.append(tag)
128

    
129
    def _open_channel(self):
130
        conn = None
131
        while conn == None:
132
            self.logger.info("Attempting to connect to %s", settings.RABBIT_HOST)
133
            try:
134
                conn = amqp.Connection( host=settings.RABBIT_HOST,
135
                                    userid=settings.RABBIT_USERNAME,
136
                                    password=settings.RABBIT_PASSWORD,
137
                                    virtual_host=settings.RABBIT_VHOST)
138
            except socket.error:
139
                time.sleep(1)
140
                pass
141

    
142
        self.logger.info("Connection succesful, opening channel")
143
        self.chan = conn.channel()
144

    
145
def exit_handler(signum, frame):
146
    global handler_logger
147

    
148
    handler_logger.info("Caught fatal signal %d, will raise SystemExit", signum)
149
    raise SystemExit
150

    
151
def child(cmdline):
152
    #Cmd line argument parsing
153
    (opts, args) = parse_arguments(cmdline)
154

    
155
    # Initialize logger
156
    lvl = logging.DEBUG if opts.debug else logging.INFO
157
    logger = logging.getLogger("okeanos.dispatcher")
158
    logger.setLevel(lvl)
159
    formatter = logging.Formatter("%(asctime)s %(module)s[%(process)d] %(levelname)s: %(message)s",
160
            "%Y-%m-%d %H:%M:%S")
161
    handler = logging.FileHandler(opts.log_file)
162
    handler.setFormatter(formatter)
163
    logger.addHandler(handler)
164

    
165
    d = Dispatcher(debug = opts.debug, logger = logger)
166

    
167
    d.wait()
168

    
169
def parse_arguments(args):
170
    from optparse import OptionParser
171

    
172
    parser = OptionParser()
173
    parser.add_option("-d", "--debug", action="store_false", default=False, dest="debug",
174
            help="Enable debug mode")
175
    parser.add_option("-l", "--log", dest="log_file",
176
            default=settings.DISPATCHER_LOG_FILE,
177
            metavar="FILE",
178
            help="Write log to FILE instead of %s" %
179
            settings.DISPATCHER_LOG_FILE)
180
    parser.add_option("-c", "--cleanup-queues", action="store_true", default=False, dest="cleanup_queues",
181
            help="Remove from RabbitMQ all queues declared in settings.py (DANGEROUS!)")
182
    
183
    return parser.parse_args(args)
184

    
185
def cleanup_queues() :
186

    
187
    conn = amqp.Connection( host=settings.RABBIT_HOST,
188
                            userid=settings.RABBIT_USERNAME,
189
                            password=settings.RABBIT_PASSWORD,
190
                            virtual_host=settings.RABBIT_VHOST)
191
    chan = conn.channel()
192

    
193
    print "Queues to be deleted: ",  settings.QUEUES
194
    print "Exchnages to be deleted: ", settings.EXCHANGES
195
    ans = raw_input("Are you sure (N/y):")
196

    
197
    if not ans:
198
        return
199
    if ans not in ['Y', 'y']:
200
        return
201

    
202
    for exchange in settings.EXCHANGES:
203
        try:
204
            chan.exchange_delete(exchange=exchange)
205
        except amqp.exceptions.AMQPChannelException as e:
206
            print e.amqp_reply_code, " ", e.amqp_reply_text
207

    
208
    for queue in settings.QUEUES:
209
        try:
210
            chan.queue_delete(queue=queue)
211
        except amqp.exceptions.AMQPChannelException as e:
212
            print e.amqp_reply_code, " ", e.amqp_reply_text
213

    
214
def main():
215
    global logger
216
    (opts, args) = parse_arguments(sys.argv[1:])
217

    
218
    if opts.cleanup_queues:
219
        cleanup_queues()
220
        return
221

    
222
    #newpid = os.fork()
223
    #if newpid == 0:
224
    child(sys.argv[1:])
225
    #else:
226
    #    pids = (os.getpid(), newpid)
227
    #    print "parent: %d, child: %d" % pids
228

    
229
    # Become a daemon:
230
    # Redirect stdout and stderr to handler.stream to catch
231
    # early errors in the daemonization process [e.g., pidfile creation]
232
    # which will otherwise go to /dev/null.
233
    #daemon_context = daemon.DaemonContext(
234
    #        umask=022,
235
    #        stdout=handler.stream,
236
    #        stderr=handler.stream,
237
    #        files_preserve=[handler.stream])
238
    #daemon_context.open()
239
    #logger.info("Became a daemon")
240
    
241
    # Catch signals to ensure graceful shutdown
242
    #signal(SIGINT, exit_handler)
243
    #signal(SIGTERM, exit_handler)
244
    #signal(SIGKILL, exit_handler)
245

    
246
if __name__ == "__main__":
247
    logging.basicConfig(level=logging.DEBUG)
248
    sys.exit(main())
249

    
250
# vim: set sta sts=4 shiftwidth=4 sw=4 et ai :