Statistics
| Branch: | Tag: | Revision:

root / db / db_controller.py @ 5db87ed5

History | View | Annotate | Download (4 kB)

1
#!/usr/bin/env python
2
#
3
# Copyright (c) 2010 Greek Research and Technology Network
4
#
5
"""Receive Ganeti events over 0mq, update VM state in DB.
6

7
This daemon receives job notifications from ganeti-0mqd
8
and updates VM state in the DB accordingly.
9

10
"""
11

    
12
from django.core.management import setup_environ
13

    
14
import sys
15
import os
16
path = os.path.normpath(os.path.join(os.getcwd(), '..'))
17
sys.path.append(path)
18
import settings
19

    
20
setup_environ(settings)
21

    
22
import sys
23
import zmq
24
import time
25
import json
26
import platform
27
import logging
28
import traceback
29

    
30
from threading import Thread, Event, currentThread
31

    
32
from synnefo.db.models import VirtualMachine
33

    
34
GANETI_ZMQ_PUBLISHER = "tcp://62.217.120.67:5801" # FIXME: move to settings.py
35

    
36
class StoppableThread(Thread):
37
    """Thread class with a stop() moethod.
38
    
39
    The thread needs to check regularly for the stopped() condition.
40
    When it does, it exits, so that another thread may .join() it.
41

42
    """
43

    
44
    def __init__(self, *args, **kwargs):
45
        super(StoppableThread, self).__init__(*args, **kwargs)
46
        self._stop = Event()
47

    
48
    def stop(self):
49
        self._stop.set()
50

    
51
    def stopped(self):
52
        return self._stop.isSet()
53

    
54

    
55
def zmq_sub_thread(subscriber):
56
    while True:
57
        logging.debug("Entering 0mq to wait for message on SUB socket.")
58
        data = subscriber.recv()
59
        logging.debug("Received message on 0mq SUB socket.")
60
        try:
61
            msg = json.loads(data)
62

    
63
            if currentThread().stopped():
64
                logging.debug("Thread has been stopped, leaving request loop.")
65
                return
66

    
67
            if msg["type"] != "ganeti-op-status":
68
                logging.debug("Ignoring message of uknown type %s." % (msg["type"]))
69
                continue
70

    
71
            vmid = VirtualMachine.id_from_instance_name(msg["instance"])
72
            vm = VirtualMachine.objects.get(id=vmid)
73
    
74
            logging.debug("Processing msg: %s" % (msg))
75
            vm.process_backend_msg(msg["jobId"], msg["operation"], msg["status"], msg["logmsg"])
76
            vm.save()
77
            logging.debug("Done processing msg for vm %s." % (msg["instance"]))
78

    
79
        except KeyError:
80
            logging.error("Malformed incoming JSON, missing attributes: " + data)
81
        except VirtualMachine.InvalidBackendIdError:
82
            logging.debug("Ignoring msg for unknown instance %s." % msg["instance"])
83
        except VirtualMachine.DoesNotExist:
84
            logging.error("VM for instance %s with id %d not found in DB." % (msg["instance"], vmid))
85
        except Exception as e:
86
            logging.error("Unexpected error:\n" + "".join(traceback.format_exception(*sys.exc_info())))
87
            continue
88

    
89
def main():
90
    # Create an inproc PUB socket, for inter-thread communication
91
    zmqc = zmq.Context()
92
    inproc = zmqc.socket(zmq.PUB)
93
    inproc.bind("inproc://threads")
94

    
95
    # Create a SUB socket, connect to ganeti-0mqd and the inproc PUB socket
96
    subscriber = zmqc.socket(zmq.SUB)
97
    subscriber.setsockopt(zmq.IDENTITY, platform.node() + "snf-db-controller")
98
    subscriber.setsockopt(zmq.SUBSCRIBE, "")
99
    subscriber.connect(GANETI_ZMQ_PUBLISHER)
100
    subscriber.connect("inproc://threads")
101

    
102
    # Use a separate thread to process incoming messages,
103
    # needed because the Python runtime interacts badly with 0mq's blocking semantics.
104
    zmqt = StoppableThread(target = zmq_sub_thread, args = (subscriber,))
105
    zmqt.start()
106

    
107
    try:
108
        logging.info("in main thread.");
109
        while True:
110
            logging.info("When I grow up, I'll be syncing with Ganeti at this point.")
111
            time.sleep(600)
112
    except:
113
        logging.error("Caught exception:\n" + "".join(traceback.format_exception(*sys.exc_info())))
114
        
115
        #
116
        # Cleanup.
117
        #
118
        # Cancel the suscriber thread, wake it up, then join it.
119
        zmqt.stop()
120
        inproc.send_json({"type":"null"})
121
        zmqt.join()
122

    
123
        return 1
124

    
125
if __name__ == "__main__":
126
    logging.basicConfig(level=logging.DEBUG)
127
    sys.exit(main())
128

    
129
# vim: set ts=4 sts=4 sw=4 et ai :