Statistics
| Branch: | Tag: | Revision:

root / db / db_controller.py @ f30730c0

History | View | Annotate | Download (7.6 kB)

1
#!/usr/bin/env python
2
#
3
# Copyright (c) 2010 Greek Research and Technology Network
4
#
5
"""Receive Ganeti events over RabbitMQ, update VM state in DB.
6

7
This daemon receives job notifications from ganeti-amqpd
8
and updates VM state in the DB accordingly.
9

10
"""
11

    
12
from django.core.management import setup_environ
13

    
14
import sys
15
import os
16
path = os.path.normpath(os.path.join(os.getcwd(), '..'))
17
sys.path.append(path)
18
import synnefo.settings as settings
19

    
20
setup_environ(settings)
21

    
22
from amqplib import client_0_8 as amqp
23

    
24
import json
25
import logging
26
import traceback
27
import time
28
import socket
29

    
30
import daemon
31
from signal import signal, SIGINT, SIGTERM, SIGKILL
32

    
33
from synnefo.db.models import VirtualMachine
34
from synnefo.logic import utils, backend
35

    
36
class Dispatcher:
37

    
38
    logger = None
39
    chan = None
40

    
41
    def __init__(self, debug = False, logger = None):
42
        self.logger = logger
43
        self._init_queues(debug)
44

    
45
    def update_db(self, message):
46
        try:
47
            msg = json.loads(message.body)
48

    
49
            if msg["type"] != "ganeti-op-status":
50
                self.logger.error("Message is of uknown type %s." % (msg["type"],))
51
                return
52

    
53
            vmid = utils.id_from_instance_name(msg["instance"])
54
            vm = VirtualMachine.objects.get(id=vmid)
55

    
56
            self.logger.debug("Processing msg: %s" % (msg,))
57
            backend.process_backend_msg(vm, msg["jobId"], msg["operation"], msg["status"], msg["logmsg"])
58
            self.logger.debug("Done processing msg for vm %s." % (msg["instance"]))
59

    
60
        except KeyError:
61
            self.logger.error("Malformed incoming JSON, missing attributes: " + message.body)
62
        except VirtualMachine.InvalidBackendIdError:
63
            self.logger.debug("Ignoring msg for unknown instance %s." % (msg["instance"],))
64
        except VirtualMachine.DoesNotExist:
65
            self.logger.error("VM for instance %s with id %d not found in DB." % (msg["instance"], vmid))
66
        except Exception as e:
67
            self.logger.error("Unexpected error:\n" + "".join(traceback.format_exception(*sys.exc_info())))
68
            return
69
        finally:
70
            message.channel.basic_ack(message.delivery_tag)
71

    
72
    def send_email(self, message):
73
        self.logger.debug("Request to send email message")
74
        message.channel.basic_ack(message.delivery_tag)
75

    
76
    def update_credits(self, message):
77
        self.logger.debug("Request to update credits")
78
        message.channel.basic_ack(message.delivery_tag)
79

    
80
    def wait(self):
81
        while True:
82
            try:
83
                self.chan.wait()
84
            except SystemExit:
85
                break
86

    
87
        self.chan.basic_cancel("dbupdater")
88
        self.chan.close()
89
        self.chan.connection.close()
90

    
91
    def _declare_queues(self):
92

    
93
        for exchange in settings.EXCHANGES:
94
            self.chan.exchange_declare(exchange=exchange, type="direct", durable=True, auto_delete=False)
95

    
96
        for queue in settings.QUEUES:
97
            self.chan.queue_declare(queue=queue, durable=True, exclusive=False, auto_delete=False)
98

    
99
    def _init_queues(self,debug):
100
        self._open_channel()
101
        if debug:
102
            self._init_devel()
103
        else:
104
            self._init()
105

    
106
    def _init_devel(self):
107
        self._declare_queues()
108
        self.chan.queue_bind(queue=settings.QUEUE_GANETI_EVENTS, exchange=settings.EXCHANGE_GANETI, routing_key="event.*")
109
        self.chan.basic_consume(queue="events", callback=self.update_db, consumer_tag="dbupdater")
110

    
111
    def _init(self):
112
        self._declare_queues()
113
        self.chan.queue_bind(queue=settings.QUEUE_GANETI_EVENTS, exchange=settings.EXCHANGE_GANETI, routing_key="event.*")
114
        self.chan.basic_consume(queue="events", callback=self.update_db, consumer_tag="dbupdater")
115

    
116
    def _open_channel(self):
117
        conn = None
118
        while conn == None:
119
            self.logger.info("Attempting to connect to %s", settings.RABBIT_HOST)
120
            try:
121
                conn = amqp.Connection( host=settings.RABBIT_HOST,
122
                                    userid=settings.RABBIT_USERNAME,
123
                                    password=settings.RABBIT_PASSWORD,
124
                                    virtual_host=settings.RABBIT_VHOST)
125
            except socket.error:
126
                time.sleep(1)
127
                pass
128

    
129
        self.logger.info("Connection succesful, opening channel")
130
        self.chan = conn.channel()
131

    
132
def exit_handler(signum, frame):
133
    global handler_logger
134

    
135
    handler_logger.info("Caught fatal signal %d, will raise SystemExit", signum)
136
    raise SystemExit
137

    
138
def child(cmdline):
139
    #Cmd line argument parsing
140
    (opts, args) = parse_arguments(cmdline)
141

    
142
    # Initialize logger
143
    lvl = logging.DEBUG if opts.debug else logging.INFO
144
    logger = logging.getLogger("okeanos.dispatcher")
145
    logger.setLevel(lvl)
146
    formatter = logging.Formatter("%(asctime)s %(module)s[%(process)d] %(levelname)s: %(message)s",
147
            "%Y-%m-%d %H:%M:%S")
148
    handler = logging.FileHandler(opts.log_file)
149
    handler.setFormatter(formatter)
150
    logger.addHandler(handler)
151

    
152
    d = Dispatcher(debug = True, logger = logger)
153

    
154
    d.wait()
155

    
156
def parse_arguments(args):
157
    from optparse import OptionParser
158

    
159
    parser = OptionParser()
160
    parser.add_option("-d", "--debug", action="store_true", dest="debug",
161
            help="Enable debug mode")
162
    parser.add_option("-l", "--log", dest="log_file",
163
            default=settings.DISPATCHER_LOG_FILE,
164
            metavar="FILE",
165
            help="Write log to FILE instead of %s" %
166
            settings.DISPATCHER_LOG_FILE)
167
    parser.add_option("-c", "--cleanup-queues", action="store_true", default=False, dest="cleanup_queues",
168
            help="Remove from RabbitMQ all queues declared in settings.py (DANGEROUS!)")
169
    
170
    return parser.parse_args(args)
171

    
172
def cleanup_queues() :
173

    
174
    conn = amqp.Connection( host=settings.RABBIT_HOST,
175
                            userid=settings.RABBIT_USERNAME,
176
                            password=settings.RABBIT_PASSWORD,
177
                            virtual_host=settings.RABBIT_VHOST)
178
    chan = conn.channel()
179

    
180
    print "Queues to be deleted: ",  settings.QUEUES
181
    print "Exchnages to be deleted: ", settings.EXCHANGES
182
    ans = raw_input("Are you sure (N/y):")
183

    
184
    if not ans:
185
        return
186
    if ans not in ['Y', 'y']:
187
        return
188

    
189
    for exchange in settings.EXCHANGES:
190
        try:
191
            chan.exchange_delete(exchange=exchange)
192
        except amqp.exceptions.AMQPChannelException as e:
193
            print e.amqp_reply_code, " ", e.amqp_reply_text
194

    
195
    for queue in settings.QUEUES:
196
        try:
197
            chan.queue_delete(queue=queue)
198
        except amqp.exceptions.AMQPChannelException as e:
199
            print e.amqp_reply_code, " ", e.amqp_reply_text
200

    
201
def main():
202
    global logger
203
    (opts, args) = parse_arguments(sys.argv[1:])
204

    
205
    if opts.cleanup_queues:
206
        cleanup_queues()
207
        return
208

    
209
    #newpid = os.fork()
210
    #if newpid == 0:
211
    child(sys.argv[1:])
212
    #else:
213
    #    pids = (os.getpid(), newpid)
214
    #    print "parent: %d, child: %d" % pids
215

    
216
    # Become a daemon:
217
    # Redirect stdout and stderr to handler.stream to catch
218
    # early errors in the daemonization process [e.g., pidfile creation]
219
    # which will otherwise go to /dev/null.
220
    #daemon_context = daemon.DaemonContext(
221
    #        umask=022,
222
    #        stdout=handler.stream,
223
    #        stderr=handler.stream,
224
    #        files_preserve=[handler.stream])
225
    #daemon_context.open()
226
    #logger.info("Became a daemon")
227
    
228
    # Catch signals to ensure graceful shutdown
229
    #signal(SIGINT, exit_handler)
230
    #signal(SIGTERM, exit_handler)
231
    #signal(SIGKILL, exit_handler)
232

    
233
if __name__ == "__main__":
234
    logging.basicConfig(level=logging.DEBUG)
235
    sys.exit(main())
236

    
237
# vim: set sta sts=4 shiftwidth=4 sw=4 et ai :