Statistics
| Branch: | Tag: | Revision:

root / db / db_controller.py @ 78e2d194

History | View | Annotate | Download (6.9 kB)

1
#!/usr/bin/env python
2
#
3
# Copyright (c) 2010 Greek Research and Technology Network
4
#
5
"""Receive Ganeti events over RabbitMQ, update VM state in DB.
6

7
This daemon receives job notifications from ganeti-amqpd
8
and updates VM state in the DB accordingly.
9

10
"""
11

    
12
from django.core.management import setup_environ
13

    
14
import sys
15
import os
16
path = os.path.normpath(os.path.join(os.getcwd(), '..'))
17
sys.path.append(path)
18
import synnefo.settings as settings
19

    
20
setup_environ(settings)
21

    
22
from amqplib import client_0_8 as amqp
23

    
24
import json
25
import logging
26
import traceback
27
import time
28
import socket
29

    
30
import daemon
31
from signal import signal, SIGINT, SIGTERM, SIGKILL
32

    
33
from synnefo.db.models import VirtualMachine
34
from synnefo.logic import utils, backend
35

    
36
class Dispatcher:
37

    
38
    logger = None
39
    chan = None
40

    
41
    def __init__(self, debug, logger):
42
        self.logger = logger
43
        self._init_queues(debug)
44

    
45
    def update_db(self, message):
46
        try:
47
            msg = json.loads(message.body)
48

    
49
            if msg["type"] != "ganeti-op-status":
50
                self.logger.error("Message is of uknown type %s." % (msg["type"],))
51
                return
52

    
53
            vmid = utils.id_from_instance_name(msg["instance"])
54
            vm = VirtualMachine.objects.get(id=vmid)
55

    
56
            self.logger.debug("Processing msg: %s" % (msg,))
57
            backend.process_backend_msg(vm, msg["jobId"], msg["operation"], msg["status"], msg["logmsg"])
58
            self.logger.debug("Done processing msg for vm %s." % (msg["instance"]))
59

    
60
        except KeyError:
61
            self.logger.error("Malformed incoming JSON, missing attributes: " + message.body)
62
        except VirtualMachine.InvalidBackendIdError:
63
            self.logger.debug("Ignoring msg for unknown instance %s." % (msg["instance"],))
64
        except VirtualMachine.DoesNotExist:
65
            self.logger.error("VM for instance %s with id %d not found in DB." % (msg["instance"], vmid))
66
        except Exception as e:
67
            self.logger.error("Unexpected error:\n" + "".join(traceback.format_exception(*sys.exc_info())))
68
            return
69
        finally:
70
            message.channel.basic_ack(message.delivery_tag)
71

    
72
    def send_email(self, message):
73
        self.logger.debug("Request to send email message")
74
        message.channel.basic_ack(message.delivery_tag)
75

    
76
    def update_credits(self, message):
77
        self.logger.debug("Request to update credits")
78
        message.channel.basic_ack(message.delivery_tag)
79

    
80
    def wait(self):
81
        while True:
82
            try:
83
                self.chan.wait()
84
            except SystemExit:
85
                break
86

    
87
        self.chan.basic_cancel("dbupdater")
88
        self.chan.close()
89
        self.chan.connection.close()
90

    
91
    def _declare_queues(self):
92
        self.chan.exchange_declare(exchange=settings.EXCHANGE_GANETI, type="direct", durable=True, auto_delete=False)
93
        self.chan.exchange_declare(exchange=settings.EXCHANGE_CRON, type="topic", durable=True, auto_delete=False)
94
        self.chan.exchange_declare(exchange=settings.EXCHANGE_API, type="topic", durable=True, auto_delete=False)
95

    
96
        self.chan.queue_declare(queue=settings.QUEUE_GANETI_EVENTS, durable=True, exclusive=False, auto_delete=False)
97
        self.chan.queue_declare(queue=settings.QUEUE_CRON_CREDITS, durable=True, exclusive=False, auto_delete=False)
98
        self.chan.queue_declare(queue=settings.QUEUE_API_EMAIL, durable=True, exclusive=False, auto_delete=False)
99
        self.chan.queue_declare(queue=settings.QUEUE_CRON_EMAIL, durable=True, exclusive=False, auto_delete=False)
100

    
101
    def _init_queues(self,debug):
102
        self._open_channel()
103
        if debug:
104
            self._init_devel()
105
        else:
106
            self._init()
107

    
108
    def _init_devel(self):
109
        self._declare_queues()
110
        self.chan.queue_bind(queue=settings.QUEUE_GANETI_EVENTS, exchange=settings.EXCHANGE_GANETI, routing_key="event.*")
111
        self.chan.basic_consume(queue="events", callback=self.update_db, consumer_tag="dbupdater")
112

    
113
    def _init(self):
114
        self._declare_queues()
115
        self.chan.queue_bind(queue=settings.QUEUE_GANETI_EVENTS, exchange=settings.EXCHANGE_GANETI, routing_key="event.*")
116
        self.chan.basic_consume(queue="events", callback=self.update_db, consumer_tag="dbupdater")
117

    
118
    def _open_channel(self):
119
        conn = None
120
        while conn == None:
121
            self.logger.info("Attempting to connect to %s", settings.RABBIT_HOST)
122
            try:
123
                conn = amqp.Connection( host=settings.RABBIT_HOST,
124
                                    userid=settings.RABBIT_USERNAME,
125
                                    password=settings.RABBIT_PASSWORD,
126
                                    virtual_host=settings.RABBIT_VHOST)
127
            except socket.error:
128
                time.sleep(1)
129
                pass
130

    
131
        self.logger.info("Connection succesful, opening channel")
132
        self.chan = conn.channel()
133

    
134
def exit_handler(signum, frame):
135
    global handler_logger
136

    
137
    handler_logger.info("Caught fatal signal %d, will raise SystemExit", signum)
138
    raise SystemExit
139

    
140
def child(cmdline):
141
    #Cmd line argument parsing
142
    (opts, args) = parse_arguments(cmdline)
143

    
144
    # Initialize logger
145
    lvl = logging.DEBUG if opts.debug else logging.INFO
146
    logger = logging.getLogger("okeanos.dispatcher")
147
    logger.setLevel(lvl)
148
    formatter = logging.Formatter("%(asctime)s %(module)s[%(process)d] %(levelname)s: %(message)s",
149
            "%Y-%m-%d %H:%M:%S")
150
    handler = logging.FileHandler(opts.log_file)
151
    handler.setFormatter(formatter)
152
    logger.addHandler(handler)
153

    
154
    d = Dispatcher(debug = True, logger = logger)
155

    
156
    d.wait()
157

    
158
def parse_arguments(args):
159
    from optparse import OptionParser
160

    
161
    parser = OptionParser()
162
    parser.add_option("-d", "--debug", action="store_true", dest="debug",
163
            help="Enable debug mode")
164
    parser.add_option("-l", "--log", dest="log_file",
165
            default=settings.DISPATCHER_LOG_FILE,
166
            metavar="FILE",
167
            help="Write log to FILE instead of %s" %
168
            settings.DISPATCHER_LOG_FILE)
169

    
170
    return parser.parse_args(args)
171

    
172
def main():
173
    global logger
174
    (opts, args) = parse_arguments(sys.argv[1:])
175

    
176
    #newpid = os.fork()
177
    #if newpid == 0:
178
    child(sys.argv[1:])
179
    #else:
180
    #    pids = (os.getpid(), newpid)
181
    #    print "parent: %d, child: %d" % pids
182

    
183
    # Become a daemon:
184
    # Redirect stdout and stderr to handler.stream to catch
185
    # early errors in the daemonization process [e.g., pidfile creation]
186
    # which will otherwise go to /dev/null.
187
    #daemon_context = daemon.DaemonContext(
188
    #        umask=022,
189
    #        stdout=handler.stream,
190
    #        stderr=handler.stream,
191
    #        files_preserve=[handler.stream])
192
    #daemon_context.open()
193
    #logger.info("Became a daemon")
194
    
195
    # Catch signals to ensure graceful shutdown
196
    #signal(SIGINT, exit_handler)
197
    #signal(SIGTERM, exit_handler)
198
    #signal(SIGKILL, exit_handler)
199

    
200
if __name__ == "__main__":
201
    logging.basicConfig(level=logging.DEBUG)
202
    sys.exit(main())
203

    
204
# vim: set sta sts=4 shiftwidth=4 sw=4 et ai :