Revision 8d8ea051 db/db_controller.py
b/db/db_controller.py | ||
---|---|---|
24 | 24 |
import json |
25 | 25 |
import logging |
26 | 26 |
import traceback |
27 |
import time |
|
28 |
import socket |
|
29 |
|
|
30 |
import daemon |
|
31 |
from signal import signal, SIGINT, SIGTERM, SIGKILL |
|
27 | 32 |
|
28 | 33 |
from synnefo.db.models import VirtualMachine |
29 | 34 |
from synnefo.logic import utils, backend |
30 | 35 |
|
36 |
logger = None |
|
37 |
|
|
31 | 38 |
def update_db(message): |
32 |
logging.debug("Received message from RabbitMQ") |
|
33 | 39 |
try: |
34 | 40 |
msg = json.loads(message.body) |
35 | 41 |
|
36 | 42 |
if msg["type"] != "ganeti-op-status": |
37 |
logging.debug("Ignoring message of uknown type %s." % (msg["type"],))
|
|
43 |
logging.error("Message is of uknown type %s." % (msg["type"],))
|
|
38 | 44 |
return |
39 | 45 |
|
40 | 46 |
vmid = utils.id_from_instance_name(msg["instance"]) |
... | ... | |
45 | 51 |
logging.debug("Done processing msg for vm %s." % (msg["instance"])) |
46 | 52 |
|
47 | 53 |
except KeyError: |
48 |
logging.error("Malformed incoming JSON, missing attributes: " + message_data)
|
|
54 |
logging.error("Malformed incoming JSON, missing attributes: " + message.body)
|
|
49 | 55 |
except VirtualMachine.InvalidBackendIdError: |
50 | 56 |
logging.debug("Ignoring msg for unknown instance %s." % (msg["instance"],)) |
51 | 57 |
except VirtualMachine.DoesNotExist: |
... | ... | |
56 | 62 |
finally: |
57 | 63 |
message.channel.basic_ack(message.delivery_tag) |
58 | 64 |
|
59 |
def main(): |
|
65 |
def send_email(message): |
|
66 |
logger.debug("Request to send email message") |
|
67 |
message.channel.basic_ack(message.delivery_tag) |
|
68 |
|
|
69 |
def update_credits(message): |
|
70 |
logger.debug("Request to update credits") |
|
71 |
message.channel.basic_ack(message.delivery_tag) |
|
72 |
|
|
73 |
def declare_queues(chan): |
|
74 |
chan.exchange_declare(exchange=settings.EXCHANGE_GANETI, type="topic", durable=True, auto_delete=False) |
|
75 |
chan.queue_declare(queue=settings.QUEUE_GANETI_EVENTS, durable=True, exclusive=False, auto_delete=False) |
|
60 | 76 |
|
61 |
conn = amqp.Connection( host=settings.RABBIT_HOST, |
|
62 |
userid=settings.RABBIT_USERNAME, |
|
63 |
password=settings.RABBIT_PASSWORD, |
|
64 |
virtual_host=settings.RABBIT_VHOST) |
|
65 |
chan = conn.channel() |
|
66 |
chan.queue_declare(queue="events", durable=True, exclusive=False, auto_delete=False) |
|
67 |
chan.exchange_declare(exchange="ganeti", type="direct", durable=True, |
|
68 |
auto_delete=False) |
|
69 |
chan.queue_bind(queue="events", exchange="ganeti", routing_key="eventd") |
|
77 |
def init_devel(): |
|
78 |
chan = open_channel() |
|
79 |
declare_queues(chan) |
|
80 |
chan.queue_bind(queue=settings.QUEUE_GANETI_EVENTS, exchange=settings.EXCHANGE_GANETI, routing_key="event.*") |
|
70 | 81 |
chan.basic_consume(queue="events", callback=update_db, consumer_tag="dbupdater") |
82 |
return chan |
|
83 |
|
|
84 |
def init(): |
|
85 |
chan = open_channel() |
|
86 |
declare_queues(chan) |
|
87 |
chan.queue_bind(queue=settings.QUEUE_GANETI_EVENTS, exchange=settings.EXCHANGE_GANETI, routing_key="event.*") |
|
88 |
chan.basic_consume(queue="events", callback=update_db, consumer_tag="dbupdater") |
|
89 |
return chan |
|
90 |
|
|
91 |
def parse_arguments(args): |
|
92 |
from optparse import OptionParser |
|
93 |
|
|
94 |
parser = OptionParser() |
|
95 |
parser.add_option("-d", "--debug", action="store_true", dest="debug", |
|
96 |
help="Enable debug mode") |
|
97 |
parser.add_option("-l", "--log", dest="log_file", |
|
98 |
default=settings.DISPATCHER_LOG_FILE, |
|
99 |
metavar="FILE", |
|
100 |
help="Write log to FILE instead of %s" % |
|
101 |
settings.DISPATCHER_LOG_FILE) |
|
102 |
|
|
103 |
return parser.parse_args(args) |
|
104 |
|
|
105 |
def exit_handler(signum, frame): |
|
106 |
global handler_logger |
|
107 |
|
|
108 |
handler_logger.info("Caught fatal signal %d, will raise SystemExit", signum) |
|
109 |
raise SystemExit |
|
110 |
|
|
111 |
def init_queues(debug): |
|
112 |
chan = None |
|
113 |
if debug: |
|
114 |
chan = init_devel() |
|
115 |
else: |
|
116 |
chan = init() |
|
117 |
return chan |
|
118 |
|
|
119 |
def open_channel(): |
|
120 |
conn = None |
|
121 |
while conn == None: |
|
122 |
logger.info("Attempting to connect to %s", settings.RABBIT_HOST) |
|
123 |
try: |
|
124 |
conn = amqp.Connection( host=settings.RABBIT_HOST, |
|
125 |
userid=settings.RABBIT_USERNAME, |
|
126 |
password=settings.RABBIT_PASSWORD, |
|
127 |
virtual_host=settings.RABBIT_VHOST) |
|
128 |
except socket.error: |
|
129 |
time.sleep(1) |
|
130 |
pass |
|
131 |
|
|
132 |
logger.info("Connection succesful, opening channel") |
|
133 |
return conn.channel() |
|
134 |
|
|
135 |
def main(): |
|
136 |
global logger |
|
137 |
(opts, args) = parse_arguments(sys.argv[1:]) |
|
138 |
|
|
139 |
# Initialize logger |
|
140 |
lvl = logging.DEBUG if opts.debug else logging.INFO |
|
141 |
logger = logging.getLogger("okeanos.dispatcher") |
|
142 |
logger.setLevel(lvl) |
|
143 |
formatter = logging.Formatter("%(asctime)s %(module)s[%(process)d] %(levelname)s: %(message)s", |
|
144 |
"%Y-%m-%d %H:%M:%S") |
|
145 |
handler = logging.FileHandler(opts.log_file) |
|
146 |
handler.setFormatter(formatter) |
|
147 |
logger.addHandler(handler) |
|
148 |
|
|
149 |
#Init the queues |
|
150 |
chan = init_queues(opts.debug) |
|
151 |
|
|
152 |
# Become a daemon: |
|
153 |
# Redirect stdout and stderr to handler.stream to catch |
|
154 |
# early errors in the daemonization process [e.g., pidfile creation] |
|
155 |
# which will otherwise go to /dev/null. |
|
156 |
daemon_context = daemon.DaemonContext( |
|
157 |
umask=022, |
|
158 |
stdout=handler.stream, |
|
159 |
stderr=handler.stream, |
|
160 |
files_preserve=[handler.stream]) |
|
161 |
daemon_context.open() |
|
162 |
logger.info("Became a daemon") |
|
163 |
|
|
164 |
# Catch signals to ensure graceful shutdown |
|
165 |
signal(SIGINT, exit_handler) |
|
166 |
signal(SIGTERM, exit_handler) |
|
167 |
signal(SIGKILL, exit_handler) |
|
71 | 168 |
|
72 | 169 |
while True: |
73 |
chan.wait() |
|
170 |
try: |
|
171 |
chan.wait() |
|
172 |
except SystemExit: |
|
173 |
break |
|
74 | 174 |
|
75 | 175 |
chan.basic_cancel("dbupdater") |
76 | 176 |
chan.close() |
77 |
conn.close() |
|
78 |
#TODO: Implement proper shutdown of channel |
|
177 |
chan.connection.close() |
|
79 | 178 |
|
80 | 179 |
if __name__ == "__main__": |
81 | 180 |
logging.basicConfig(level=logging.DEBUG) |
82 | 181 |
sys.exit(main()) |
83 | 182 |
|
84 |
# vim: set ts=4 sts=4 sw=4 et ai : |
|
183 |
# vim: set sta sts=4 shiftwidth=4 sw=4 et ai : |
Also available in: Unified diff