root / db / db_controller.py @ c92af313
History | View | Annotate | Download (4 kB)
1 |
#!/usr/bin/env python
|
---|---|
2 |
#
|
3 |
# Copyright (c) 2010 Greek Research and Technology Network
|
4 |
#
|
5 |
"""Receive Ganeti events over 0mq, update VM state in DB.
|
6 |
|
7 |
This daemon receives job notifications from ganeti-0mqd
|
8 |
and updates VM state in the DB accordingly.
|
9 |
|
10 |
"""
|
11 |
|
12 |
from django.core.management import setup_environ |
13 |
|
14 |
import sys |
15 |
import os |
16 |
path = os.path.normpath(os.path.join(os.getcwd(), '..'))
|
17 |
sys.path.append(path) |
18 |
import settings |
19 |
|
20 |
setup_environ(settings) |
21 |
|
22 |
import zmq |
23 |
import time |
24 |
import json |
25 |
import platform |
26 |
import logging |
27 |
import traceback |
28 |
|
29 |
from threading import Thread, Event, currentThread |
30 |
|
31 |
from synnefo.db.models import VirtualMachine |
32 |
|
33 |
GANETI_ZMQ_PUBLISHER = "tcp://62.217.120.67:5801" # FIXME: move to settings.py |
34 |
|
35 |
class StoppableThread(Thread): |
36 |
"""Thread class with a stop() method.
|
37 |
|
38 |
The thread needs to check regularly for the stopped() condition.
|
39 |
When it does, it exits, so that another thread may .join() it.
|
40 |
|
41 |
"""
|
42 |
|
43 |
def __init__(self, *args, **kwargs): |
44 |
super(StoppableThread, self).__init__(*args, **kwargs) |
45 |
self._stop = Event()
|
46 |
|
47 |
def stop(self): |
48 |
self._stop.set()
|
49 |
|
50 |
def stopped(self): |
51 |
return self._stop.isSet() |
52 |
|
53 |
|
54 |
def zmq_sub_thread(subscriber): |
55 |
while True: |
56 |
logging.debug("Entering 0mq to wait for message on SUB socket.")
|
57 |
data = subscriber.recv() |
58 |
logging.debug("Received message on 0mq SUB socket.")
|
59 |
try:
|
60 |
msg = json.loads(data) |
61 |
|
62 |
if currentThread().stopped():
|
63 |
logging.debug("Thread has been stopped, leaving request loop.")
|
64 |
return
|
65 |
|
66 |
if msg["type"] != "ganeti-op-status": |
67 |
logging.debug("Ignoring message of uknown type %s." % (msg["type"])) |
68 |
continue
|
69 |
|
70 |
vmid = VirtualMachine.id_from_instance_name(msg["instance"])
|
71 |
vm = VirtualMachine.objects.get(id=vmid) |
72 |
|
73 |
logging.debug("Processing msg: %s" % (msg))
|
74 |
vm.process_backend_msg(msg["jobId"], msg["operation"], msg["status"], msg["logmsg"]) |
75 |
vm.save() |
76 |
logging.debug("Done processing msg for vm %s." % (msg["instance"])) |
77 |
|
78 |
except KeyError: |
79 |
logging.error("Malformed incoming JSON, missing attributes: " + data)
|
80 |
except VirtualMachine.InvalidBackendIdError:
|
81 |
logging.debug("Ignoring msg for unknown instance %s." % msg["instance"]) |
82 |
except VirtualMachine.DoesNotExist:
|
83 |
logging.error("VM for instance %s with id %d not found in DB." % (msg["instance"], vmid)) |
84 |
except Exception as e: |
85 |
logging.error("Unexpected error:\n" + "".join(traceback.format_exception(*sys.exc_info()))) |
86 |
continue
|
87 |
|
88 |
def main(): |
89 |
# Create an inproc PUB socket, for inter-thread communication
|
90 |
zmqc = zmq.Context() |
91 |
inproc = zmqc.socket(zmq.PUB) |
92 |
inproc.bind("inproc://threads")
|
93 |
|
94 |
# Create a SUB socket, connect to ganeti-0mqd and the inproc PUB socket
|
95 |
subscriber = zmqc.socket(zmq.SUB) |
96 |
subscriber.setsockopt(zmq.IDENTITY, platform.node() + "snf-db-controller")
|
97 |
subscriber.setsockopt(zmq.SUBSCRIBE, "")
|
98 |
subscriber.connect(GANETI_ZMQ_PUBLISHER) |
99 |
subscriber.connect("inproc://threads")
|
100 |
|
101 |
# Use a separate thread to process incoming messages,
|
102 |
# needed because the Python runtime interacts badly with 0mq's blocking semantics.
|
103 |
zmqt = StoppableThread(target = zmq_sub_thread, args = (subscriber,)) |
104 |
zmqt.start() |
105 |
|
106 |
try:
|
107 |
logging.info("in main thread.");
|
108 |
while True: |
109 |
logging.info("When I grow up, I'll be syncing with Ganeti at this point.")
|
110 |
time.sleep(600)
|
111 |
except:
|
112 |
logging.error("Caught exception:\n" + "".join(traceback.format_exception(*sys.exc_info()))) |
113 |
|
114 |
#
|
115 |
# Cleanup.
|
116 |
#
|
117 |
# Cancel the suscriber thread, wake it up, then join it.
|
118 |
zmqt.stop() |
119 |
inproc.send_json({"type":"null"}) |
120 |
zmqt.join() |
121 |
|
122 |
return 1 |
123 |
|
124 |
if __name__ == "__main__": |
125 |
logging.basicConfig(level=logging.DEBUG) |
126 |
sys.exit(main()) |
127 |
|
128 |
# vim: set ts=4 sts=4 sw=4 et ai :
|