Revision d08a5f6f db/db_controller.py
b/db/db_controller.py | ||
---|---|---|
1 |
#!/usr/bin/env python |
|
1 | 2 |
# |
2 |
# Bill Allocator - Administration script |
|
3 |
# |
|
4 |
# Run all the time, and wait for messages from ganeti, then update the database |
|
5 |
# |
|
6 |
# Copyright 2010 Greek Research and Technology Network |
|
3 |
# Copyright (c) 2010 Greek Research and Technology Network |
|
7 | 4 |
# |
5 |
"""Receive Ganeti events over 0mq, update VM state in DB. |
|
8 | 6 |
|
9 |
import zmq |
|
7 |
This daemon receives job notifications from ganeti-0mqd |
|
8 |
and updates VM state in the DB accordingly. |
|
10 | 9 |
|
11 |
from db.models import *
|
|
10 |
"""
|
|
12 | 11 |
|
13 |
#GANETI_ZMQ_PUBLISHER = "tcp://ganeti-master:5801"
|
|
12 |
from django.core.management import setup_environ
|
|
14 | 13 |
|
15 |
def init_publisher(context): |
|
16 |
request = context.socket(zmq.REQ) |
|
17 |
request.connect('tcp://127.0.0.1:6666') |
|
18 |
request.send_json('{ "message" : "hello" }') |
|
19 |
|
|
20 |
message = request.recv_json() |
|
14 |
import sys |
|
15 |
# FIXME |
|
16 |
# WIP: Fix the $PATH, append /home/devel, where synnefo/ resides. |
|
17 |
# Eventually, there will be a wrapper script for synnefo.db.DBController. |
|
18 |
sys.path.append("/home/devel") |
|
19 |
from synnefo import settings |
|
20 |
|
|
21 |
setup_environ(settings) |
|
22 |
|
|
23 |
import sys |
|
24 |
import zmq |
|
25 |
import time |
|
26 |
import json |
|
27 |
import logging |
|
28 |
import traceback |
|
29 |
|
|
30 |
from synnefo.db.models import VirtualMachine |
|
31 |
|
|
32 |
GANETI_ZMQ_PUBLISHER = "tcp://62.217.120.67:5801" # FIXME: move to settings.py |
|
21 | 33 |
|
22 | 34 |
def main(): |
23 |
context = zmq.Context() |
|
24 |
|
|
25 |
subscriber = context.socket(zmq.SUB) |
|
26 |
subscriber.connect('tcp://127.0.0.1:5801') |
|
27 |
|
|
28 |
# accept all messages |
|
29 |
subscriber.setsockopt(zmq.IDENTITY, "DBController") |
|
30 |
subscriber.setsockopt(zmq.SUBSCRIBE, '') |
|
31 |
|
|
32 |
init_publisher(context) |
|
33 |
|
|
35 |
# Connect to ganeti-0mqd |
|
36 |
zmqc = zmq.Context() |
|
37 |
subscriber = zmqc.socket(zmq.SUB) |
|
38 |
subscriber.setsockopt(zmq.IDENTITY, "snf-db-controller") |
|
39 |
subscriber.setsockopt(zmq.SUBSCRIBE, "") |
|
40 |
subscriber.connect(GANETI_ZMQ_PUBLISHER) |
|
41 |
|
|
42 |
# FIXME: Logging |
|
43 |
logging.info("Subscribed to %s. Press Ctrl-\ to quit." % GANETI_ZMQ_PUBLISHER) |
|
44 |
|
|
45 |
# Get updates, expect random Ctrl-C death |
|
46 |
# FIXME: Ctrl-C (SIGINT) does not work with .recv(), |
|
47 |
# try Ctrl-\ (SIGQUIT) instead. |
|
34 | 48 |
while True: |
35 |
message = sock.recv_json() |
|
49 |
data = subscriber.recv() |
|
50 |
try: |
|
51 |
msg = json.loads(data) |
|
52 |
|
|
53 |
if msg["type"] != "ganeti-op-status": |
|
54 |
logging.debug("Ignoring message of uknown type %s." % (msg["type"])) |
|
55 |
continue |
|
56 |
|
|
57 |
vmid = VirtualMachine.id_from_instance_name(msg["instance"]) |
|
58 |
vm = VirtualMachine.objects.get(id=vmid) |
|
36 | 59 |
|
37 |
subscriber.close() |
|
60 |
logging.debug("Processing msg: %s" % (msg)) |
|
61 |
vm.process_backend_msg(msg["jobId"], msg["operation"], msg["status"], msg["logmsg"]) |
|
62 |
vm.save() |
|
63 |
logging.debug("Done processing msg for vm %s." % (msg["instance"])) |
|
64 |
|
|
65 |
except KeyError: |
|
66 |
logging.error("Malformed incoming JSON, missing attributes: " + data) |
|
67 |
except VirtualMachine.InvalidBackendIdError: |
|
68 |
logging.debug("Ignoring msg for unknown instance %s." % msg["instance"]) |
|
69 |
except VirtualMachine.DoesNotExist: |
|
70 |
logging.error("VM for instance %s with id %d not found in DB." % (msg["instance"], vmid)) |
|
71 |
except Exception as e: |
|
72 |
logging.error("Unexpected error:\n" + "".join(traceback.format_exception(*sys.exc_info()))) |
|
73 |
continue |
|
74 |
|
|
75 |
if __name__ == "__main__": |
|
76 |
logging.basicConfig(level=logging.DEBUG) |
|
77 |
sys.exit(main()) |
|
78 |
|
|
79 |
# vim: set ts=4 sts=4 sw=4 et ai : |
Also available in: Unified diff