Revision 8d8ea051 db/db_controller.py

b/db/db_controller.py
24 24
import json
25 25
import logging
26 26
import traceback
27
import time
28
import socket
29

  
30
import daemon
31
from signal import signal, SIGINT, SIGTERM, SIGKILL
27 32

  
28 33
from synnefo.db.models import VirtualMachine
29 34
from synnefo.logic import utils, backend
30 35

  
36
logger = None
37

  
31 38
def update_db(message):
32
    logging.debug("Received message from RabbitMQ")
33 39
    try:
34 40
        msg = json.loads(message.body)
35 41

  
36 42
        if msg["type"] != "ganeti-op-status":
37
            logging.debug("Ignoring message of uknown type %s." % (msg["type"],))
43
            logging.error("Message is of uknown type %s." % (msg["type"],))
38 44
            return
39 45

  
40 46
        vmid = utils.id_from_instance_name(msg["instance"])
......
45 51
        logging.debug("Done processing msg for vm %s." % (msg["instance"]))
46 52

  
47 53
    except KeyError:
48
        logging.error("Malformed incoming JSON, missing attributes: " + message_data)
54
        logging.error("Malformed incoming JSON, missing attributes: " + message.body)
49 55
    except VirtualMachine.InvalidBackendIdError:
50 56
        logging.debug("Ignoring msg for unknown instance %s." % (msg["instance"],))
51 57
    except VirtualMachine.DoesNotExist:
......
56 62
    finally:
57 63
        message.channel.basic_ack(message.delivery_tag)
58 64

  
59
def main():
65
def send_email(message):
66
    logger.debug("Request to send email message")
67
    message.channel.basic_ack(message.delivery_tag)
68

  
69
def update_credits(message):
70
    logger.debug("Request to update credits")
71
    message.channel.basic_ack(message.delivery_tag)
72

  
73
def declare_queues(chan):
74
    chan.exchange_declare(exchange=settings.EXCHANGE_GANETI, type="topic", durable=True, auto_delete=False)
75
    chan.queue_declare(queue=settings.QUEUE_GANETI_EVENTS, durable=True, exclusive=False, auto_delete=False)
60 76

  
61
    conn = amqp.Connection( host=settings.RABBIT_HOST,
62
                            userid=settings.RABBIT_USERNAME,
63
                            password=settings.RABBIT_PASSWORD,
64
                            virtual_host=settings.RABBIT_VHOST)
65
    chan = conn.channel()
66
    chan.queue_declare(queue="events", durable=True, exclusive=False, auto_delete=False)
67
    chan.exchange_declare(exchange="ganeti", type="direct", durable=True,
68
            auto_delete=False)
69
    chan.queue_bind(queue="events", exchange="ganeti", routing_key="eventd")
77
def init_devel():
78
    chan = open_channel()
79
    declare_queues(chan)
80
    chan.queue_bind(queue=settings.QUEUE_GANETI_EVENTS, exchange=settings.EXCHANGE_GANETI, routing_key="event.*")
70 81
    chan.basic_consume(queue="events", callback=update_db, consumer_tag="dbupdater")
82
    return chan
83

  
84
def init():
85
    chan = open_channel()
86
    declare_queues(chan)
87
    chan.queue_bind(queue=settings.QUEUE_GANETI_EVENTS, exchange=settings.EXCHANGE_GANETI, routing_key="event.*")
88
    chan.basic_consume(queue="events", callback=update_db, consumer_tag="dbupdater")
89
    return chan
90

  
91
def parse_arguments(args):
92
    from optparse import OptionParser
93

  
94
    parser = OptionParser()
95
    parser.add_option("-d", "--debug", action="store_true", dest="debug",
96
            help="Enable debug mode")
97
    parser.add_option("-l", "--log", dest="log_file",
98
            default=settings.DISPATCHER_LOG_FILE,
99
            metavar="FILE",
100
            help="Write log to FILE instead of %s" %
101
            settings.DISPATCHER_LOG_FILE)
102

  
103
    return parser.parse_args(args)
104

  
105
def exit_handler(signum, frame):
106
    global handler_logger
107

  
108
    handler_logger.info("Caught fatal signal %d, will raise SystemExit", signum)
109
    raise SystemExit
110

  
111
def init_queues(debug):
112
    chan = None
113
    if debug:
114
        chan = init_devel()
115
    else:
116
        chan = init()
117
    return chan
118

  
119
def open_channel():
120
    conn = None
121
    while conn == None:
122
        logger.info("Attempting to connect to %s", settings.RABBIT_HOST)
123
        try:
124
            conn = amqp.Connection( host=settings.RABBIT_HOST,
125
                                    userid=settings.RABBIT_USERNAME,
126
                                    password=settings.RABBIT_PASSWORD,
127
                                    virtual_host=settings.RABBIT_VHOST)
128
        except socket.error:
129
            time.sleep(1)
130
            pass
131

  
132
    logger.info("Connection succesful, opening channel")
133
    return conn.channel()
134

  
135
def main():
136
    global logger
137
    (opts, args) = parse_arguments(sys.argv[1:])
138

  
139
    # Initialize logger
140
    lvl = logging.DEBUG if opts.debug else logging.INFO
141
    logger = logging.getLogger("okeanos.dispatcher")
142
    logger.setLevel(lvl)
143
    formatter = logging.Formatter("%(asctime)s %(module)s[%(process)d] %(levelname)s: %(message)s",
144
            "%Y-%m-%d %H:%M:%S")
145
    handler = logging.FileHandler(opts.log_file)
146
    handler.setFormatter(formatter)
147
    logger.addHandler(handler)
148

  
149
    #Init the queues
150
    chan = init_queues(opts.debug)
151

  
152
    # Become a daemon:
153
    # Redirect stdout and stderr to handler.stream to catch
154
    # early errors in the daemonization process [e.g., pidfile creation]
155
    # which will otherwise go to /dev/null.
156
    daemon_context = daemon.DaemonContext(
157
            umask=022,
158
            stdout=handler.stream,
159
            stderr=handler.stream,
160
            files_preserve=[handler.stream])
161
    daemon_context.open()
162
    logger.info("Became a daemon")
163
    
164
    # Catch signals to ensure graceful shutdown
165
    signal(SIGINT, exit_handler)
166
    signal(SIGTERM, exit_handler)
167
    signal(SIGKILL, exit_handler)
71 168

  
72 169
    while True:
73
        chan.wait()
170
        try:
171
            chan.wait()
172
        except SystemExit:
173
            break
74 174

  
75 175
    chan.basic_cancel("dbupdater")
76 176
    chan.close()
77
    conn.close()
78
#TODO: Implement proper shutdown of channel
177
    chan.connection.close()
79 178

  
80 179
if __name__ == "__main__":
81 180
    logging.basicConfig(level=logging.DEBUG)
82 181
    sys.exit(main())
83 182

  
84
# vim: set ts=4 sts=4 sw=4 et ai :
183
# vim: set sta sts=4 shiftwidth=4 sw=4 et ai :

Also available in: Unified diff