root / db / db_controller.py @ e3a99a08
History | View | Annotate | Download (4.3 kB)
1 |
#!/usr/bin/env python
|
---|---|
2 |
#
|
3 |
# Copyright (c) 2010 Greek Research and Technology Network
|
4 |
#
|
5 |
"""Receive Ganeti events over 0mq, update VM state in DB.
|
6 |
|
7 |
This daemon receives job notifications from ganeti-0mqd
|
8 |
and updates VM state in the DB accordingly.
|
9 |
|
10 |
"""
|
11 |
|
12 |
from django.core.management import setup_environ |
13 |
|
14 |
import sys |
15 |
import os |
16 |
path = os.path.normpath(os.path.join(os.getcwd(), '..'))
|
17 |
sys.path.append(path) |
18 |
import synnefo.settings as settings |
19 |
|
20 |
setup_environ(settings) |
21 |
|
22 |
import zmq |
23 |
import time |
24 |
import json |
25 |
import platform |
26 |
import logging |
27 |
import getpass |
28 |
import traceback |
29 |
|
30 |
from threading import Thread, Event, currentThread |
31 |
|
32 |
from synnefo.db.models import VirtualMachine |
33 |
from synnefo.logic import utils |
34 |
|
35 |
GANETI_ZMQ_PUBLISHER = "tcp://62.217.120.67:5801" # FIXME: move to settings.py |
36 |
|
37 |
class StoppableThread(Thread): |
38 |
"""Thread class with a stop() method.
|
39 |
|
40 |
The thread needs to check regularly for the stopped() condition.
|
41 |
When it does, it exits, so that another thread may .join() it.
|
42 |
|
43 |
"""
|
44 |
|
45 |
def __init__(self, *args, **kwargs): |
46 |
super(StoppableThread, self).__init__(*args, **kwargs) |
47 |
self._stop = Event()
|
48 |
|
49 |
def stop(self): |
50 |
self._stop.set()
|
51 |
|
52 |
def stopped(self): |
53 |
return self._stop.isSet() |
54 |
|
55 |
|
56 |
def zmq_sub_thread(subscriber): |
57 |
while True: |
58 |
logging.debug("Entering 0mq to wait for message on SUB socket.")
|
59 |
data = subscriber.recv() |
60 |
logging.debug("Received message on 0mq SUB socket.")
|
61 |
try:
|
62 |
msg = json.loads(data) |
63 |
|
64 |
if currentThread().stopped():
|
65 |
logging.debug("Thread has been stopped, leaving request loop.")
|
66 |
return
|
67 |
|
68 |
if msg["type"] != "ganeti-op-status": |
69 |
logging.debug("Ignoring message of uknown type %s." % (msg["type"],)) |
70 |
continue
|
71 |
|
72 |
vmid = utils.id_from_instance_name(msg["instance"])
|
73 |
vm = VirtualMachine.objects.get(id=vmid) |
74 |
|
75 |
logging.debug("Processing msg: %s" % (msg,))
|
76 |
vm.process_backend_msg(msg["jobId"], msg["operation"], msg["status"], msg["logmsg"]) |
77 |
vm.save() |
78 |
logging.debug("Done processing msg for vm %s." % (msg["instance"])) |
79 |
|
80 |
except KeyError: |
81 |
logging.error("Malformed incoming JSON, missing attributes: " + data)
|
82 |
except VirtualMachine.InvalidBackendIdError:
|
83 |
logging.debug("Ignoring msg for unknown instance %s." % (msg["instance"],)) |
84 |
except VirtualMachine.DoesNotExist:
|
85 |
logging.error("VM for instance %s with id %d not found in DB." % (msg["instance"], vmid)) |
86 |
except Exception as e: |
87 |
logging.error("Unexpected error:\n" + "".join(traceback.format_exception(*sys.exc_info()))) |
88 |
continue
|
89 |
|
90 |
def main(): |
91 |
# Create an inproc PUB socket, for inter-thread communication
|
92 |
zmqc = zmq.Context() |
93 |
inproc = zmqc.socket(zmq.PUB) |
94 |
inproc.bind("inproc://threads")
|
95 |
|
96 |
#
|
97 |
# Create a SUB socket, connect to ganeti-0mqd and the inproc PUB socket
|
98 |
#
|
99 |
subscriber = zmqc.socket(zmq.SUB) |
100 |
|
101 |
# Combine the hostname, username and a constant string to get
|
102 |
# a hopefully unique identity for this 0mq peer.
|
103 |
# Reusing zmq.IDENTITY for two distinct peers triggers this 0mq bug:
|
104 |
# https://github.com/zeromq/zeromq2/issues/30
|
105 |
subscriber.setsockopt(zmq.IDENTITY, platform.node() + getpass.getuser() + "snf-db-controller")
|
106 |
subscriber.setsockopt(zmq.SUBSCRIBE, "")
|
107 |
subscriber.connect(GANETI_ZMQ_PUBLISHER) |
108 |
subscriber.connect("inproc://threads")
|
109 |
|
110 |
# Use a separate thread to process incoming messages,
|
111 |
# needed because the Python runtime interacts badly with 0mq's blocking semantics.
|
112 |
zmqt = StoppableThread(target = zmq_sub_thread, args = (subscriber,)) |
113 |
zmqt.start() |
114 |
|
115 |
try:
|
116 |
logging.info("in main thread.");
|
117 |
while True: |
118 |
logging.info("When I grow up, I'll be syncing with Ganeti at this point.")
|
119 |
time.sleep(600)
|
120 |
except:
|
121 |
logging.error("Caught exception:\n" + "".join(traceback.format_exception(*sys.exc_info()))) |
122 |
|
123 |
#
|
124 |
# Cleanup.
|
125 |
#
|
126 |
# Cancel the suscriber thread, wake it up, then join it.
|
127 |
zmqt.stop() |
128 |
inproc.send_json({"type":"null"}) |
129 |
zmqt.join() |
130 |
|
131 |
return 1 |
132 |
|
133 |
if __name__ == "__main__": |
134 |
logging.basicConfig(level=logging.DEBUG) |
135 |
sys.exit(main()) |
136 |
|
137 |
# vim: set ts=4 sts=4 sw=4 et ai :
|