Revision da102335 db/db_controller.py
b/db/db_controller.py | ||
---|---|---|
2 | 2 |
# |
3 | 3 |
# Copyright (c) 2010 Greek Research and Technology Network |
4 | 4 |
# |
5 |
"""Receive Ganeti events over 0mq, update VM state in DB.
|
|
5 |
"""Receive Ganeti events over RabbitMQ, update VM state in DB.
|
|
6 | 6 |
|
7 |
This daemon receives job notifications from ganeti-0mqd
|
|
7 |
This daemon receives job notifications from ganeti-amqpd
|
|
8 | 8 |
and updates VM state in the DB accordingly. |
9 | 9 |
|
10 | 10 |
""" |
... | ... | |
19 | 19 |
|
20 | 20 |
setup_environ(settings) |
21 | 21 |
|
22 |
import zmq
|
|
23 |
import time |
|
22 |
from amqplib import client_0_8 as amqp
|
|
23 |
|
|
24 | 24 |
import json |
25 |
import platform |
|
26 | 25 |
import logging |
27 |
import getpass |
|
28 | 26 |
import traceback |
29 | 27 |
|
30 |
from threading import Thread, Event, currentThread |
|
31 | 28 |
from synnefo.db.models import VirtualMachine |
32 |
from synnefo.settings import GANETI_MASTER_IP, GANETI_0MQD_PUB_PORT |
|
33 | 29 |
from synnefo.logic import utils, backend |
34 | 30 |
|
35 |
class StoppableThread(Thread): |
|
36 |
"""Thread class with a stop() method. |
|
37 |
|
|
38 |
The thread needs to check regularly for the stopped() condition. |
|
39 |
When it does, it exits, so that another thread may .join() it. |
|
40 |
|
|
41 |
""" |
|
42 |
|
|
43 |
def __init__(self, *args, **kwargs): |
|
44 |
Thread.__init__(self, *args, **kwargs) |
|
45 |
self._stop = Event() |
|
46 |
|
|
47 |
def stop(self): |
|
48 |
self._stop.set() |
|
31 |
def update_db(message): |
|
32 |
logging.debug("Received message from RabbitMQ") |
|
33 |
try: |
|
34 |
msg = json.loads(message.body) |
|
35 |
|
|
36 |
if msg["type"] != "ganeti-op-status": |
|
37 |
logging.debug("Ignoring message of uknown type %s." % (msg["type"],)) |
|
38 |
return |
|
39 |
|
|
40 |
vmid = utils.id_from_instance_name(msg["instance"]) |
|
41 |
vm = VirtualMachine.objects.get(id=vmid) |
|
42 |
|
|
43 |
logging.debug("Processing msg: %s" % (msg,)) |
|
44 |
backend.process_backend_msg(vm, msg["jobId"], msg["operation"], msg["status"], msg["logmsg"]) |
|
45 |
logging.debug("Done processing msg for vm %s." % (msg["instance"])) |
|
46 |
|
|
47 |
except KeyError: |
|
48 |
logging.error("Malformed incoming JSON, missing attributes: " + message_data) |
|
49 |
except VirtualMachine.InvalidBackendIdError: |
|
50 |
logging.debug("Ignoring msg for unknown instance %s." % (msg["instance"],)) |
|
51 |
except VirtualMachine.DoesNotExist: |
|
52 |
logging.error("VM for instance %s with id %d not found in DB." % (msg["instance"], vmid)) |
|
53 |
except Exception as e: |
|
54 |
logging.error("Unexpected error:\n" + "".join(traceback.format_exception(*sys.exc_info()))) |
|
55 |
return |
|
56 |
finally: |
|
57 |
message.channel.basic_ack(message.delivery_tag) |
|
49 | 58 |
|
50 |
def stopped(self): |
|
51 |
return self._stop.isSet() |
|
59 |
def main(): |
|
52 | 60 |
|
61 |
conn = amqp.Connection( host=settings.RABBIT_HOST, |
|
62 |
userid=settings.RABBIT_USERNAME, |
|
63 |
password=settings.RABBIT_PASSWORD, |
|
64 |
virtual_host=settings.RABBIT_VHOST) |
|
65 |
chan = conn.channel() |
|
66 |
chan.queue_declare(queue="events", durable=True, exclusive=False, auto_delete=False) |
|
67 |
chan.exchange_declare(exchange="ganeti", type="direct", durable=True, |
|
68 |
auto_delete=False) |
|
69 |
chan.queue_bind(queue="events", exchange="ganeti", routing_key="eventd") |
|
70 |
chan.basic_consume(queue="events", callback=update_db, consumer_tag="dbupdater") |
|
53 | 71 |
|
54 |
def zmq_sub_thread(subscriber): |
|
55 |
logging.error("Entering 0mq to wait for message on SUB socket.") |
|
56 | 72 |
while True: |
57 |
logging.debug("Entering 0mq to wait for message on SUB socket.") |
|
58 |
data = subscriber.recv() |
|
59 |
logging.debug("Received message on 0mq SUB socket.") |
|
60 |
try: |
|
61 |
msg = json.loads(data) |
|
62 |
|
|
63 |
if currentThread().stopped(): |
|
64 |
logging.debug("Thread has been stopped, leaving request loop.") |
|
65 |
return |
|
66 |
|
|
67 |
if msg["type"] != "ganeti-op-status": |
|
68 |
logging.debug("Ignoring message of uknown type %s." % (msg["type"],)) |
|
69 |
continue |
|
70 |
|
|
71 |
vmid = utils.id_from_instance_name(msg["instance"]) |
|
72 |
vm = VirtualMachine.objects.get(id=vmid) |
|
73 |
|
|
74 |
logging.debug("Processing msg: %s" % (msg,)) |
|
75 |
backend.process_backend_msg(vm, msg["jobId"], msg["operation"], msg["status"], msg["logmsg"]) |
|
76 |
logging.debug("Done processing msg for vm %s." % (msg["instance"])) |
|
77 |
|
|
78 |
except KeyError: |
|
79 |
logging.error("Malformed incoming JSON, missing attributes: " + data) |
|
80 |
except VirtualMachine.InvalidBackendIdError: |
|
81 |
logging.debug("Ignoring msg for unknown instance %s." % (msg["instance"],)) |
|
82 |
except VirtualMachine.DoesNotExist: |
|
83 |
logging.error("VM for instance %s with id %d not found in DB." % (msg["instance"], vmid)) |
|
84 |
except Exception as e: |
|
85 |
logging.error("Unexpected error:\n" + "".join(traceback.format_exception(*sys.exc_info()))) |
|
86 |
continue |
|
87 |
|
|
88 |
def main(): |
|
89 |
# Create an inproc PUB socket, for inter-thread communication |
|
90 |
zmqc = zmq.Context() |
|
91 |
inproc = zmqc.socket(zmq.PUB) |
|
92 |
inproc.bind("inproc://threads") |
|
93 |
|
|
94 |
# |
|
95 |
# Create a SUB socket, connect to ganeti-0mqd and the inproc PUB socket |
|
96 |
# |
|
97 |
subscriber = zmqc.socket(zmq.SUB) |
|
98 |
|
|
99 |
# Combine the hostname, username and a constant string to get |
|
100 |
# a hopefully unique identity for this 0mq peer. |
|
101 |
# Reusing zmq.IDENTITY for two distinct peers triggers this 0mq bug: |
|
102 |
# https://github.com/zeromq/zeromq2/issues/30 |
|
103 |
GANETI_ZMQ_PUBLISHER = "tcp://%s:%d" % (GANETI_MASTER_IP, int(GANETI_0MQD_PUB_PORT)) |
|
104 |
subscriber.setsockopt(zmq.IDENTITY, platform.node() + getpass.getuser() + "snf-db-controller") |
|
105 |
subscriber.setsockopt(zmq.SUBSCRIBE, "") |
|
106 |
subscriber.connect(GANETI_ZMQ_PUBLISHER) |
|
107 |
subscriber.connect("inproc://threads") |
|
108 |
|
|
109 |
# Use a separate thread to process incoming messages, |
|
110 |
# needed because the Python runtime interacts badly with 0mq's blocking semantics. |
|
111 |
zmqt = StoppableThread(target = zmq_sub_thread, args = (subscriber,)) |
|
112 |
zmqt.start() |
|
73 |
chan.wait() |
|
113 | 74 |
|
114 |
try: |
|
115 |
logging.info("in main thread."); |
|
116 |
while True: |
|
117 |
logging.info("When I grow up, I'll be syncing with Ganeti at this point.") |
|
118 |
time.sleep(600) |
|
119 |
except: |
|
120 |
logging.error("Caught exception:\n" + "".join(traceback.format_exception(*sys.exc_info()))) |
|
121 |
|
|
122 |
# |
|
123 |
# Cleanup. |
|
124 |
# |
|
125 |
# Cancel the suscriber thread, wake it up, then join it. |
|
126 |
zmqt.stop() |
|
127 |
inproc.send_json({"type":"null"}) |
|
128 |
zmqt.join() |
|
129 |
|
|
130 |
return 1 |
|
75 |
chan.basic_cancel("dbupdater") |
|
76 |
chan.close() |
|
77 |
conn.close() |
|
78 |
#TODO: Implement proper shutdown of channel |
|
131 | 79 |
|
132 | 80 |
if __name__ == "__main__": |
133 | 81 |
logging.basicConfig(level=logging.DEBUG) |
Also available in: Unified diff