Statistics
| Branch: | Tag: | Revision:

root / db / db_controller.py @ c92af313

History | View | Annotate | Download (4 kB)

1
#!/usr/bin/env python
2
#
3
# Copyright (c) 2010 Greek Research and Technology Network
4
#
5
"""Receive Ganeti events over 0mq, update VM state in DB.
6

7
This daemon receives job notifications from ganeti-0mqd
8
and updates VM state in the DB accordingly.
9

10
"""
11

    
12
from django.core.management import setup_environ
13

    
14
import sys
15
import os
16
path = os.path.normpath(os.path.join(os.getcwd(), '..'))
17
sys.path.append(path)
18
import settings
19

    
20
setup_environ(settings)
21

    
22
import zmq
23
import time
24
import json
25
import platform
26
import logging
27
import traceback
28

    
29
from threading import Thread, Event, currentThread
30

    
31
from synnefo.db.models import VirtualMachine
32

    
33
GANETI_ZMQ_PUBLISHER = "tcp://62.217.120.67:5801" # FIXME: move to settings.py
34

    
35
class StoppableThread(Thread):
36
    """Thread class with a stop() method.
37
    
38
    The thread needs to check regularly for the stopped() condition.
39
    When it does, it exits, so that another thread may .join() it.
40

41
    """
42

    
43
    def __init__(self, *args, **kwargs):
44
        super(StoppableThread, self).__init__(*args, **kwargs)
45
        self._stop = Event()
46

    
47
    def stop(self):
48
        self._stop.set()
49

    
50
    def stopped(self):
51
        return self._stop.isSet()
52

    
53

    
54
def zmq_sub_thread(subscriber):
55
    while True:
56
        logging.debug("Entering 0mq to wait for message on SUB socket.")
57
        data = subscriber.recv()
58
        logging.debug("Received message on 0mq SUB socket.")
59
        try:
60
            msg = json.loads(data)
61

    
62
            if currentThread().stopped():
63
                logging.debug("Thread has been stopped, leaving request loop.")
64
                return
65

    
66
            if msg["type"] != "ganeti-op-status":
67
                logging.debug("Ignoring message of uknown type %s." % (msg["type"]))
68
                continue
69

    
70
            vmid = VirtualMachine.id_from_instance_name(msg["instance"])
71
            vm = VirtualMachine.objects.get(id=vmid)
72
    
73
            logging.debug("Processing msg: %s" % (msg))
74
            vm.process_backend_msg(msg["jobId"], msg["operation"], msg["status"], msg["logmsg"])
75
            vm.save()
76
            logging.debug("Done processing msg for vm %s." % (msg["instance"]))
77

    
78
        except KeyError:
79
            logging.error("Malformed incoming JSON, missing attributes: " + data)
80
        except VirtualMachine.InvalidBackendIdError:
81
            logging.debug("Ignoring msg for unknown instance %s." % msg["instance"])
82
        except VirtualMachine.DoesNotExist:
83
            logging.error("VM for instance %s with id %d not found in DB." % (msg["instance"], vmid))
84
        except Exception as e:
85
            logging.error("Unexpected error:\n" + "".join(traceback.format_exception(*sys.exc_info())))
86
            continue
87

    
88
def main():
89
    # Create an inproc PUB socket, for inter-thread communication
90
    zmqc = zmq.Context()
91
    inproc = zmqc.socket(zmq.PUB)
92
    inproc.bind("inproc://threads")
93

    
94
    # Create a SUB socket, connect to ganeti-0mqd and the inproc PUB socket
95
    subscriber = zmqc.socket(zmq.SUB)
96
    subscriber.setsockopt(zmq.IDENTITY, platform.node() + "snf-db-controller")
97
    subscriber.setsockopt(zmq.SUBSCRIBE, "")
98
    subscriber.connect(GANETI_ZMQ_PUBLISHER)
99
    subscriber.connect("inproc://threads")
100

    
101
    # Use a separate thread to process incoming messages,
102
    # needed because the Python runtime interacts badly with 0mq's blocking semantics.
103
    zmqt = StoppableThread(target = zmq_sub_thread, args = (subscriber,))
104
    zmqt.start()
105

    
106
    try:
107
        logging.info("in main thread.");
108
        while True:
109
            logging.info("When I grow up, I'll be syncing with Ganeti at this point.")
110
            time.sleep(600)
111
    except:
112
        logging.error("Caught exception:\n" + "".join(traceback.format_exception(*sys.exc_info())))
113
        
114
        #
115
        # Cleanup.
116
        #
117
        # Cancel the suscriber thread, wake it up, then join it.
118
        zmqt.stop()
119
        inproc.send_json({"type":"null"})
120
        zmqt.join()
121

    
122
        return 1
123

    
124
if __name__ == "__main__":
125
    logging.basicConfig(level=logging.DEBUG)
126
    sys.exit(main())
127

    
128
# vim: set ts=4 sts=4 sw=4 et ai :