Revision d08a5f6f db/db_controller.py

b/db/db_controller.py
1
#!/usr/bin/env python
1 2
#
2
# Bill Allocator - Administration script
3
#
4
# Run all the time, and wait for messages from ganeti, then update the database
5
#
6
# Copyright 2010 Greek Research and Technology Network
3
# Copyright (c) 2010 Greek Research and Technology Network
7 4
#
5
"""Receive Ganeti events over 0mq, update VM state in DB.
8 6

  
9
import zmq
7
This daemon receives job notifications from ganeti-0mqd
8
and updates VM state in the DB accordingly.
10 9

  
11
from db.models import *
10
"""
12 11

  
13
#GANETI_ZMQ_PUBLISHER = "tcp://ganeti-master:5801"
12
from django.core.management import setup_environ
14 13

  
15
def init_publisher(context):
16
    request = context.socket(zmq.REQ)
17
    request.connect('tcp://127.0.0.1:6666')
18
    request.send_json('{ "message" : "hello" }')
19
    
20
    message = request.recv_json()
14
import sys
15
# FIXME
16
# WIP: Fix the $PATH, append /home/devel, where synnefo/ resides.
17
# Eventually, there will be a wrapper script for synnefo.db.DBController.
18
sys.path.append("/home/devel")
19
from synnefo import settings
20

  
21
setup_environ(settings)
22

  
23
import sys
24
import zmq
25
import time
26
import json
27
import logging
28
import traceback
29

  
30
from synnefo.db.models import VirtualMachine
31

  
32
GANETI_ZMQ_PUBLISHER = "tcp://62.217.120.67:5801" # FIXME: move to settings.py
21 33

  
22 34
def main():
23
    context = zmq.Context()
24
    
25
    subscriber = context.socket(zmq.SUB)
26
    subscriber.connect('tcp://127.0.0.1:5801')
27
    
28
    # accept all messages
29
    subscriber.setsockopt(zmq.IDENTITY, "DBController")
30
    subscriber.setsockopt(zmq.SUBSCRIBE, '')
31
    
32
    init_publisher(context)
33
    
35
    # Connect to ganeti-0mqd
36
    zmqc = zmq.Context()
37
    subscriber = zmqc.socket(zmq.SUB)
38
    subscriber.setsockopt(zmq.IDENTITY, "snf-db-controller")
39
    subscriber.setsockopt(zmq.SUBSCRIBE, "")
40
    subscriber.connect(GANETI_ZMQ_PUBLISHER)
41

  
42
    # FIXME: Logging
43
    logging.info("Subscribed to %s. Press Ctrl-\ to quit." % GANETI_ZMQ_PUBLISHER)
44

  
45
    # Get updates, expect random Ctrl-C death
46
    # FIXME: Ctrl-C (SIGINT) does not work with .recv(),
47
    # try Ctrl-\ (SIGQUIT) instead.
34 48
    while True:
35
        message = sock.recv_json()        
49
        data = subscriber.recv()
50
        try:
51
            msg = json.loads(data)
52

  
53
            if msg["type"] != "ganeti-op-status":
54
                logging.debug("Ignoring message of uknown type %s." % (msg["type"]))
55
                continue
56

  
57
            vmid = VirtualMachine.id_from_instance_name(msg["instance"])
58
            vm = VirtualMachine.objects.get(id=vmid)
36 59
    
37
    subscriber.close()
60
            logging.debug("Processing msg: %s" % (msg))
61
            vm.process_backend_msg(msg["jobId"], msg["operation"], msg["status"], msg["logmsg"])
62
            vm.save()
63
            logging.debug("Done processing msg for vm %s." % (msg["instance"]))
64

  
65
        except KeyError:
66
            logging.error("Malformed incoming JSON, missing attributes: " + data)
67
        except VirtualMachine.InvalidBackendIdError:
68
            logging.debug("Ignoring msg for unknown instance %s." % msg["instance"])
69
        except VirtualMachine.DoesNotExist:
70
            logging.error("VM for instance %s with id %d not found in DB." % (msg["instance"], vmid))
71
        except Exception as e:
72
            logging.error("Unexpected error:\n" + "".join(traceback.format_exception(*sys.exc_info())))
73
            continue
74

  
75
if __name__ == "__main__":
76
    logging.basicConfig(level=logging.DEBUG)
77
    sys.exit(main())
78

  
79
# vim: set ts=4 sts=4 sw=4 et ai :

Also available in: Unified diff