root / db / db_controller.py @ d08a5f6f
History | View | Annotate | Download (2.5 kB)
1 |
#!/usr/bin/env python
|
---|---|
2 |
#
|
3 |
# Copyright (c) 2010 Greek Research and Technology Network
|
4 |
#
|
5 |
"""Receive Ganeti events over 0mq, update VM state in DB.
|
6 |
|
7 |
This daemon receives job notifications from ganeti-0mqd
|
8 |
and updates VM state in the DB accordingly.
|
9 |
|
10 |
"""
|
11 |
|
12 |
from django.core.management import setup_environ |
13 |
|
14 |
import sys |
15 |
# FIXME
|
16 |
# WIP: Fix the $PATH, append /home/devel, where synnefo/ resides.
|
17 |
# Eventually, there will be a wrapper script for synnefo.db.DBController.
|
18 |
sys.path.append("/home/devel")
|
19 |
from synnefo import settings |
20 |
|
21 |
setup_environ(settings) |
22 |
|
23 |
import sys |
24 |
import zmq |
25 |
import time |
26 |
import json |
27 |
import logging |
28 |
import traceback |
29 |
|
30 |
from synnefo.db.models import VirtualMachine |
31 |
|
32 |
GANETI_ZMQ_PUBLISHER = "tcp://62.217.120.67:5801" # FIXME: move to settings.py |
33 |
|
34 |
def main(): |
35 |
# Connect to ganeti-0mqd
|
36 |
zmqc = zmq.Context() |
37 |
subscriber = zmqc.socket(zmq.SUB) |
38 |
subscriber.setsockopt(zmq.IDENTITY, "snf-db-controller")
|
39 |
subscriber.setsockopt(zmq.SUBSCRIBE, "")
|
40 |
subscriber.connect(GANETI_ZMQ_PUBLISHER) |
41 |
|
42 |
# FIXME: Logging
|
43 |
logging.info("Subscribed to %s. Press Ctrl-\ to quit." % GANETI_ZMQ_PUBLISHER)
|
44 |
|
45 |
# Get updates, expect random Ctrl-C death
|
46 |
# FIXME: Ctrl-C (SIGINT) does not work with .recv(),
|
47 |
# try Ctrl-\ (SIGQUIT) instead.
|
48 |
while True: |
49 |
data = subscriber.recv() |
50 |
try:
|
51 |
msg = json.loads(data) |
52 |
|
53 |
if msg["type"] != "ganeti-op-status": |
54 |
logging.debug("Ignoring message of uknown type %s." % (msg["type"])) |
55 |
continue
|
56 |
|
57 |
vmid = VirtualMachine.id_from_instance_name(msg["instance"])
|
58 |
vm = VirtualMachine.objects.get(id=vmid) |
59 |
|
60 |
logging.debug("Processing msg: %s" % (msg))
|
61 |
vm.process_backend_msg(msg["jobId"], msg["operation"], msg["status"], msg["logmsg"]) |
62 |
vm.save() |
63 |
logging.debug("Done processing msg for vm %s." % (msg["instance"])) |
64 |
|
65 |
except KeyError: |
66 |
logging.error("Malformed incoming JSON, missing attributes: " + data)
|
67 |
except VirtualMachine.InvalidBackendIdError:
|
68 |
logging.debug("Ignoring msg for unknown instance %s." % msg["instance"]) |
69 |
except VirtualMachine.DoesNotExist:
|
70 |
logging.error("VM for instance %s with id %d not found in DB." % (msg["instance"], vmid)) |
71 |
except Exception as e: |
72 |
logging.error("Unexpected error:\n" + "".join(traceback.format_exception(*sys.exc_info()))) |
73 |
continue
|
74 |
|
75 |
if __name__ == "__main__": |
76 |
logging.basicConfig(level=logging.DEBUG) |
77 |
sys.exit(main()) |
78 |
|
79 |
# vim: set ts=4 sts=4 sw=4 et ai :
|