Statistics
| Branch: | Tag: | Revision:

root / db / db_controller.py @ 8d8ea051

History | View | Annotate | Download (5.6 kB)

1
#!/usr/bin/env python
2
#
3
# Copyright (c) 2010 Greek Research and Technology Network
4
#
5
"""Receive Ganeti events over RabbitMQ, update VM state in DB.
6

7
This daemon receives job notifications from ganeti-amqpd
8
and updates VM state in the DB accordingly.
9

10
"""
11

    
12
from django.core.management import setup_environ
13

    
14
import sys
15
import os
16
path = os.path.normpath(os.path.join(os.getcwd(), '..'))
17
sys.path.append(path)
18
import synnefo.settings as settings
19

    
20
setup_environ(settings)
21

    
22
from amqplib import client_0_8 as amqp
23

    
24
import json
25
import logging
26
import traceback
27
import time
28
import socket
29

    
30
import daemon
31
from signal import signal, SIGINT, SIGTERM, SIGKILL
32

    
33
from synnefo.db.models import VirtualMachine
34
from synnefo.logic import utils, backend
35

    
36
logger = None
37

    
38
def update_db(message):
39
    try:
40
        msg = json.loads(message.body)
41

    
42
        if msg["type"] != "ganeti-op-status":
43
            logging.error("Message is of uknown type %s." % (msg["type"],))
44
            return
45

    
46
        vmid = utils.id_from_instance_name(msg["instance"])
47
        vm = VirtualMachine.objects.get(id=vmid)
48

    
49
        logging.debug("Processing msg: %s" % (msg,))
50
        backend.process_backend_msg(vm, msg["jobId"], msg["operation"], msg["status"], msg["logmsg"])
51
        logging.debug("Done processing msg for vm %s." % (msg["instance"]))
52

    
53
    except KeyError:
54
        logging.error("Malformed incoming JSON, missing attributes: " + message.body)
55
    except VirtualMachine.InvalidBackendIdError:
56
        logging.debug("Ignoring msg for unknown instance %s." % (msg["instance"],))
57
    except VirtualMachine.DoesNotExist:
58
        logging.error("VM for instance %s with id %d not found in DB." % (msg["instance"], vmid))
59
    except Exception as e:
60
        logging.error("Unexpected error:\n" + "".join(traceback.format_exception(*sys.exc_info())))
61
        return
62
    finally:
63
        message.channel.basic_ack(message.delivery_tag)
64

    
65
def send_email(message):
66
    logger.debug("Request to send email message")
67
    message.channel.basic_ack(message.delivery_tag)
68

    
69
def update_credits(message):
70
    logger.debug("Request to update credits")
71
    message.channel.basic_ack(message.delivery_tag)
72

    
73
def declare_queues(chan):
74
    chan.exchange_declare(exchange=settings.EXCHANGE_GANETI, type="topic", durable=True, auto_delete=False)
75
    chan.queue_declare(queue=settings.QUEUE_GANETI_EVENTS, durable=True, exclusive=False, auto_delete=False)
76

    
77
def init_devel():
78
    chan = open_channel()
79
    declare_queues(chan)
80
    chan.queue_bind(queue=settings.QUEUE_GANETI_EVENTS, exchange=settings.EXCHANGE_GANETI, routing_key="event.*")
81
    chan.basic_consume(queue="events", callback=update_db, consumer_tag="dbupdater")
82
    return chan
83

    
84
def init():
85
    chan = open_channel()
86
    declare_queues(chan)
87
    chan.queue_bind(queue=settings.QUEUE_GANETI_EVENTS, exchange=settings.EXCHANGE_GANETI, routing_key="event.*")
88
    chan.basic_consume(queue="events", callback=update_db, consumer_tag="dbupdater")
89
    return chan
90

    
91
def parse_arguments(args):
92
    from optparse import OptionParser
93

    
94
    parser = OptionParser()
95
    parser.add_option("-d", "--debug", action="store_true", dest="debug",
96
            help="Enable debug mode")
97
    parser.add_option("-l", "--log", dest="log_file",
98
            default=settings.DISPATCHER_LOG_FILE,
99
            metavar="FILE",
100
            help="Write log to FILE instead of %s" %
101
            settings.DISPATCHER_LOG_FILE)
102

    
103
    return parser.parse_args(args)
104

    
105
def exit_handler(signum, frame):
106
    global handler_logger
107

    
108
    handler_logger.info("Caught fatal signal %d, will raise SystemExit", signum)
109
    raise SystemExit
110

    
111
def init_queues(debug):
112
    chan = None
113
    if debug:
114
        chan = init_devel()
115
    else:
116
        chan = init()
117
    return chan
118

    
119
def open_channel():
120
    conn = None
121
    while conn == None:
122
        logger.info("Attempting to connect to %s", settings.RABBIT_HOST)
123
        try:
124
            conn = amqp.Connection( host=settings.RABBIT_HOST,
125
                                    userid=settings.RABBIT_USERNAME,
126
                                    password=settings.RABBIT_PASSWORD,
127
                                    virtual_host=settings.RABBIT_VHOST)
128
        except socket.error:
129
            time.sleep(1)
130
            pass
131

    
132
    logger.info("Connection succesful, opening channel")
133
    return conn.channel()
134

    
135
def main():
136
    global logger
137
    (opts, args) = parse_arguments(sys.argv[1:])
138

    
139
    # Initialize logger
140
    lvl = logging.DEBUG if opts.debug else logging.INFO
141
    logger = logging.getLogger("okeanos.dispatcher")
142
    logger.setLevel(lvl)
143
    formatter = logging.Formatter("%(asctime)s %(module)s[%(process)d] %(levelname)s: %(message)s",
144
            "%Y-%m-%d %H:%M:%S")
145
    handler = logging.FileHandler(opts.log_file)
146
    handler.setFormatter(formatter)
147
    logger.addHandler(handler)
148

    
149
    #Init the queues
150
    chan = init_queues(opts.debug)
151

    
152
    # Become a daemon:
153
    # Redirect stdout and stderr to handler.stream to catch
154
    # early errors in the daemonization process [e.g., pidfile creation]
155
    # which will otherwise go to /dev/null.
156
    daemon_context = daemon.DaemonContext(
157
            umask=022,
158
            stdout=handler.stream,
159
            stderr=handler.stream,
160
            files_preserve=[handler.stream])
161
    daemon_context.open()
162
    logger.info("Became a daemon")
163
    
164
    # Catch signals to ensure graceful shutdown
165
    signal(SIGINT, exit_handler)
166
    signal(SIGTERM, exit_handler)
167
    signal(SIGKILL, exit_handler)
168

    
169
    while True:
170
        try:
171
            chan.wait()
172
        except SystemExit:
173
            break
174

    
175
    chan.basic_cancel("dbupdater")
176
    chan.close()
177
    chan.connection.close()
178

    
179
if __name__ == "__main__":
180
    logging.basicConfig(level=logging.DEBUG)
181
    sys.exit(main())
182

    
183
# vim: set sta sts=4 shiftwidth=4 sw=4 et ai :