root / db / db_controller.py @ 432fc8c3
History | View | Annotate | Download (4.3 kB)
1 |
#!/usr/bin/env python
|
---|---|
2 |
#
|
3 |
# Copyright (c) 2010 Greek Research and Technology Network
|
4 |
#
|
5 |
"""Receive Ganeti events over 0mq, update VM state in DB.
|
6 |
|
7 |
This daemon receives job notifications from ganeti-0mqd
|
8 |
and updates VM state in the DB accordingly.
|
9 |
|
10 |
"""
|
11 |
|
12 |
from django.core.management import setup_environ |
13 |
|
14 |
import sys |
15 |
import os |
16 |
path = os.path.normpath(os.path.join(os.getcwd(), '..'))
|
17 |
sys.path.append(path) |
18 |
import synnefo.settings as settings |
19 |
|
20 |
setup_environ(settings) |
21 |
|
22 |
import zmq |
23 |
import time |
24 |
import json |
25 |
import platform |
26 |
import logging |
27 |
import getpass |
28 |
import traceback |
29 |
|
30 |
from threading import Thread, Event, currentThread |
31 |
from synnefo.db.models import VirtualMachine |
32 |
from synnefo.settings import GANETI_ZMQ_PUBLISHER |
33 |
from synnefo.logic import utils, backend |
34 |
|
35 |
class StoppableThread(Thread): |
36 |
"""Thread class with a stop() method.
|
37 |
|
38 |
The thread needs to check regularly for the stopped() condition.
|
39 |
When it does, it exits, so that another thread may .join() it.
|
40 |
|
41 |
"""
|
42 |
|
43 |
def __init__(self, *args, **kwargs): |
44 |
super(StoppableThread, self).__init__(*args, **kwargs) |
45 |
self._stop = Event()
|
46 |
|
47 |
def stop(self): |
48 |
self._stop.set()
|
49 |
|
50 |
def stopped(self): |
51 |
return self._stop.isSet() |
52 |
|
53 |
|
54 |
def zmq_sub_thread(subscriber): |
55 |
while True: |
56 |
logging.debug("Entering 0mq to wait for message on SUB socket.")
|
57 |
data = subscriber.recv() |
58 |
logging.debug("Received message on 0mq SUB socket.")
|
59 |
try:
|
60 |
msg = json.loads(data) |
61 |
|
62 |
if currentThread().stopped():
|
63 |
logging.debug("Thread has been stopped, leaving request loop.")
|
64 |
return
|
65 |
|
66 |
if msg["type"] != "ganeti-op-status": |
67 |
logging.debug("Ignoring message of uknown type %s." % (msg["type"],)) |
68 |
continue
|
69 |
|
70 |
vmid = utils.id_from_instance_name(msg["instance"])
|
71 |
vm = VirtualMachine.objects.get(id=vmid) |
72 |
|
73 |
logging.debug("Processing msg: %s" % (msg,))
|
74 |
backend.process_backend_msg(vm, msg["jobId"], msg["operation"], msg["status"], msg["logmsg"]) |
75 |
logging.debug("Done processing msg for vm %s." % (msg["instance"])) |
76 |
|
77 |
except KeyError: |
78 |
logging.error("Malformed incoming JSON, missing attributes: " + data)
|
79 |
except VirtualMachine.InvalidBackendIdError:
|
80 |
logging.debug("Ignoring msg for unknown instance %s." % (msg["instance"],)) |
81 |
except VirtualMachine.DoesNotExist:
|
82 |
logging.error("VM for instance %s with id %d not found in DB." % (msg["instance"], vmid)) |
83 |
except Exception as e: |
84 |
logging.error("Unexpected error:\n" + "".join(traceback.format_exception(*sys.exc_info()))) |
85 |
continue
|
86 |
|
87 |
def main(): |
88 |
# Create an inproc PUB socket, for inter-thread communication
|
89 |
zmqc = zmq.Context() |
90 |
inproc = zmqc.socket(zmq.PUB) |
91 |
inproc.bind("inproc://threads")
|
92 |
|
93 |
#
|
94 |
# Create a SUB socket, connect to ganeti-0mqd and the inproc PUB socket
|
95 |
#
|
96 |
subscriber = zmqc.socket(zmq.SUB) |
97 |
|
98 |
# Combine the hostname, username and a constant string to get
|
99 |
# a hopefully unique identity for this 0mq peer.
|
100 |
# Reusing zmq.IDENTITY for two distinct peers triggers this 0mq bug:
|
101 |
# https://github.com/zeromq/zeromq2/issues/30
|
102 |
subscriber.setsockopt(zmq.IDENTITY, platform.node() + getpass.getuser() + "snf-db-controller")
|
103 |
subscriber.setsockopt(zmq.SUBSCRIBE, "")
|
104 |
subscriber.connect(GANETI_ZMQ_PUBLISHER) |
105 |
subscriber.connect("inproc://threads")
|
106 |
|
107 |
# Use a separate thread to process incoming messages,
|
108 |
# needed because the Python runtime interacts badly with 0mq's blocking semantics.
|
109 |
zmqt = StoppableThread(target = zmq_sub_thread, args = (subscriber,)) |
110 |
zmqt.start() |
111 |
|
112 |
try:
|
113 |
logging.info("in main thread.");
|
114 |
while True: |
115 |
logging.info("When I grow up, I'll be syncing with Ganeti at this point.")
|
116 |
time.sleep(600)
|
117 |
except:
|
118 |
logging.error("Caught exception:\n" + "".join(traceback.format_exception(*sys.exc_info()))) |
119 |
|
120 |
#
|
121 |
# Cleanup.
|
122 |
#
|
123 |
# Cancel the suscriber thread, wake it up, then join it.
|
124 |
zmqt.stop() |
125 |
inproc.send_json({"type":"null"}) |
126 |
zmqt.join() |
127 |
|
128 |
return 1 |
129 |
|
130 |
if __name__ == "__main__": |
131 |
logging.basicConfig(level=logging.DEBUG) |
132 |
sys.exit(main()) |
133 |
|
134 |
# vim: set ts=4 sts=4 sw=4 et ai :
|