Statistics
| Branch: | Tag: | Revision:

root / db / db_controller.py @ 02feca11

History | View | Annotate | Download (4.3 kB)

1
#!/usr/bin/env python
2
#
3
# Copyright (c) 2010 Greek Research and Technology Network
4
#
5
"""Receive Ganeti events over 0mq, update VM state in DB.
6

7
This daemon receives job notifications from ganeti-0mqd
8
and updates VM state in the DB accordingly.
9

10
"""
11

    
12
from django.core.management import setup_environ
13

    
14
import sys
15
import os
16
path = os.path.normpath(os.path.join(os.getcwd(), '..'))
17
sys.path.append(path)
18
import synnefo.settings as settings
19

    
20
setup_environ(settings)
21

    
22
import zmq
23
import time
24
import json
25
import platform
26
import logging
27
import getpass
28
import traceback
29

    
30
from threading import Thread, Event, currentThread
31

    
32
from synnefo.db.models import VirtualMachine
33

    
34
from synnefo.logic import utils, backend
35

    
36
GANETI_ZMQ_PUBLISHER = "tcp://62.217.120.67:5801" # FIXME: move to settings.py
37

    
38
class StoppableThread(Thread):
39
    """Thread class with a stop() method.
40
    
41
    The thread needs to check regularly for the stopped() condition.
42
    When it does, it exits, so that another thread may .join() it.
43

44
    """
45

    
46
    def __init__(self, *args, **kwargs):
47
        super(StoppableThread, self).__init__(*args, **kwargs)
48
        self._stop = Event()
49

    
50
    def stop(self):
51
        self._stop.set()
52

    
53
    def stopped(self):
54
        return self._stop.isSet()
55

    
56

    
57
def zmq_sub_thread(subscriber):
58
    while True:
59
        logging.debug("Entering 0mq to wait for message on SUB socket.")
60
        data = subscriber.recv()
61
        logging.debug("Received message on 0mq SUB socket.")
62
        try:
63
            msg = json.loads(data)
64

    
65
            if currentThread().stopped():
66
                logging.debug("Thread has been stopped, leaving request loop.")
67
                return
68

    
69
            if msg["type"] != "ganeti-op-status":
70
                logging.debug("Ignoring message of uknown type %s." % (msg["type"],))
71
                continue
72

    
73
            vmid = utils.id_from_instance_name(msg["instance"])
74
            vm = VirtualMachine.objects.get(id=vmid)
75
    
76
            logging.debug("Processing msg: %s" % (msg,))
77
            backend.process_backend_msg(msg["jobId"], msg["operation"], msg["status"], msg["logmsg"])
78
            vm.save()
79
            logging.debug("Done processing msg for vm %s." % (msg["instance"]))
80

    
81
        except KeyError:
82
            logging.error("Malformed incoming JSON, missing attributes: " + data)
83
        except VirtualMachine.InvalidBackendIdError:
84
            logging.debug("Ignoring msg for unknown instance %s." % (msg["instance"],))
85
        except VirtualMachine.DoesNotExist:
86
            logging.error("VM for instance %s with id %d not found in DB." % (msg["instance"], vmid))
87
        except Exception as e:
88
            logging.error("Unexpected error:\n" + "".join(traceback.format_exception(*sys.exc_info())))
89
            continue
90

    
91
def main():
92
    # Create an inproc PUB socket, for inter-thread communication
93
    zmqc = zmq.Context()
94
    inproc = zmqc.socket(zmq.PUB)
95
    inproc.bind("inproc://threads")
96

    
97
    #
98
    # Create a SUB socket, connect to ganeti-0mqd and the inproc PUB socket
99
    #
100
    subscriber = zmqc.socket(zmq.SUB)
101

    
102
    # Combine the hostname, username and a constant string to get
103
    # a hopefully unique identity for this 0mq peer.
104
    # Reusing zmq.IDENTITY for two distinct peers triggers this 0mq bug:
105
    # https://github.com/zeromq/zeromq2/issues/30
106
    subscriber.setsockopt(zmq.IDENTITY, platform.node() + getpass.getuser() + "snf-db-controller")
107
    subscriber.setsockopt(zmq.SUBSCRIBE, "")
108
    subscriber.connect(GANETI_ZMQ_PUBLISHER)
109
    subscriber.connect("inproc://threads")
110

    
111
    # Use a separate thread to process incoming messages,
112
    # needed because the Python runtime interacts badly with 0mq's blocking semantics.
113
    zmqt = StoppableThread(target = zmq_sub_thread, args = (subscriber,))
114
    zmqt.start()
115

    
116
    try:
117
        logging.info("in main thread.");
118
        while True:
119
            logging.info("When I grow up, I'll be syncing with Ganeti at this point.")
120
            time.sleep(600)
121
    except:
122
        logging.error("Caught exception:\n" + "".join(traceback.format_exception(*sys.exc_info())))
123
        
124
        #
125
        # Cleanup.
126
        #
127
        # Cancel the suscriber thread, wake it up, then join it.
128
        zmqt.stop()
129
        inproc.send_json({"type":"null"})
130
        zmqt.join()
131

    
132
        return 1
133

    
134
if __name__ == "__main__":
135
    logging.basicConfig(level=logging.DEBUG)
136
    sys.exit(main())
137

    
138
# vim: set ts=4 sts=4 sw=4 et ai :