Statistics
| Branch: | Tag: | Revision:

root / db / db_controller.py @ df9dc46b

History | View | Annotate | Download (4 kB)

1 d08a5f6f Vangelis Koukis
#!/usr/bin/env python
2 87ace70f Vassilios Karakoidas
#
3 d08a5f6f Vangelis Koukis
# Copyright (c) 2010 Greek Research and Technology Network
4 7bd50624 Vassilios Karakoidas
#
5 d08a5f6f Vangelis Koukis
"""Receive Ganeti events over 0mq, update VM state in DB.
6 87ace70f Vassilios Karakoidas

7 d08a5f6f Vangelis Koukis
This daemon receives job notifications from ganeti-0mqd
8 d08a5f6f Vangelis Koukis
and updates VM state in the DB accordingly.
9 fcbc5bb3 Vassilios Karakoidas

10 d08a5f6f Vangelis Koukis
"""
11 87ace70f Vassilios Karakoidas
12 d08a5f6f Vangelis Koukis
from django.core.management import setup_environ
13 c99fe4c7 Vassilios Karakoidas
14 d08a5f6f Vangelis Koukis
import sys
15 86221fd5 Panos Louridas
import os
16 86221fd5 Panos Louridas
path = os.path.normpath(os.path.join(os.getcwd(), '..'))
17 86221fd5 Panos Louridas
sys.path.append(path)
18 86221fd5 Panos Louridas
import settings
19 d08a5f6f Vangelis Koukis
20 d08a5f6f Vangelis Koukis
setup_environ(settings)
21 d08a5f6f Vangelis Koukis
22 d08a5f6f Vangelis Koukis
import sys
23 d08a5f6f Vangelis Koukis
import zmq
24 d08a5f6f Vangelis Koukis
import time
25 d08a5f6f Vangelis Koukis
import json
26 5db87ed5 Vangelis Koukis
import platform
27 d08a5f6f Vangelis Koukis
import logging
28 d08a5f6f Vangelis Koukis
import traceback
29 d08a5f6f Vangelis Koukis
30 5db87ed5 Vangelis Koukis
from threading import Thread, Event, currentThread
31 21f59a5d Vangelis Koukis
32 d08a5f6f Vangelis Koukis
from synnefo.db.models import VirtualMachine
33 d08a5f6f Vangelis Koukis
34 d08a5f6f Vangelis Koukis
GANETI_ZMQ_PUBLISHER = "tcp://62.217.120.67:5801" # FIXME: move to settings.py
35 c99fe4c7 Vassilios Karakoidas
36 5db87ed5 Vangelis Koukis
class StoppableThread(Thread):
37 5db87ed5 Vangelis Koukis
    """Thread class with a stop() moethod.
38 5db87ed5 Vangelis Koukis
    
39 5db87ed5 Vangelis Koukis
    The thread needs to check regularly for the stopped() condition.
40 5db87ed5 Vangelis Koukis
    When it does, it exits, so that another thread may .join() it.
41 5db87ed5 Vangelis Koukis

42 5db87ed5 Vangelis Koukis
    """
43 5db87ed5 Vangelis Koukis
44 5db87ed5 Vangelis Koukis
    def __init__(self, *args, **kwargs):
45 5db87ed5 Vangelis Koukis
        super(StoppableThread, self).__init__(*args, **kwargs)
46 5db87ed5 Vangelis Koukis
        self._stop = Event()
47 5db87ed5 Vangelis Koukis
48 5db87ed5 Vangelis Koukis
    def stop(self):
49 5db87ed5 Vangelis Koukis
        self._stop.set()
50 5db87ed5 Vangelis Koukis
51 5db87ed5 Vangelis Koukis
    def stopped(self):
52 5db87ed5 Vangelis Koukis
        return self._stop.isSet()
53 5db87ed5 Vangelis Koukis
54 5db87ed5 Vangelis Koukis
55 21f59a5d Vangelis Koukis
def zmq_sub_thread(subscriber):
56 be7b8d37 Vassilios Karakoidas
    while True:
57 21f59a5d Vangelis Koukis
        logging.debug("Entering 0mq to wait for message on SUB socket.")
58 d08a5f6f Vangelis Koukis
        data = subscriber.recv()
59 21f59a5d Vangelis Koukis
        logging.debug("Received message on 0mq SUB socket.")
60 d08a5f6f Vangelis Koukis
        try:
61 d08a5f6f Vangelis Koukis
            msg = json.loads(data)
62 d08a5f6f Vangelis Koukis
63 5db87ed5 Vangelis Koukis
            if currentThread().stopped():
64 5db87ed5 Vangelis Koukis
                logging.debug("Thread has been stopped, leaving request loop.")
65 5db87ed5 Vangelis Koukis
                return
66 5db87ed5 Vangelis Koukis
67 d08a5f6f Vangelis Koukis
            if msg["type"] != "ganeti-op-status":
68 d08a5f6f Vangelis Koukis
                logging.debug("Ignoring message of uknown type %s." % (msg["type"]))
69 d08a5f6f Vangelis Koukis
                continue
70 d08a5f6f Vangelis Koukis
71 d08a5f6f Vangelis Koukis
            vmid = VirtualMachine.id_from_instance_name(msg["instance"])
72 d08a5f6f Vangelis Koukis
            vm = VirtualMachine.objects.get(id=vmid)
73 be7b8d37 Vassilios Karakoidas
    
74 d08a5f6f Vangelis Koukis
            logging.debug("Processing msg: %s" % (msg))
75 d08a5f6f Vangelis Koukis
            vm.process_backend_msg(msg["jobId"], msg["operation"], msg["status"], msg["logmsg"])
76 d08a5f6f Vangelis Koukis
            vm.save()
77 d08a5f6f Vangelis Koukis
            logging.debug("Done processing msg for vm %s." % (msg["instance"]))
78 d08a5f6f Vangelis Koukis
79 d08a5f6f Vangelis Koukis
        except KeyError:
80 d08a5f6f Vangelis Koukis
            logging.error("Malformed incoming JSON, missing attributes: " + data)
81 d08a5f6f Vangelis Koukis
        except VirtualMachine.InvalidBackendIdError:
82 d08a5f6f Vangelis Koukis
            logging.debug("Ignoring msg for unknown instance %s." % msg["instance"])
83 d08a5f6f Vangelis Koukis
        except VirtualMachine.DoesNotExist:
84 d08a5f6f Vangelis Koukis
            logging.error("VM for instance %s with id %d not found in DB." % (msg["instance"], vmid))
85 d08a5f6f Vangelis Koukis
        except Exception as e:
86 d08a5f6f Vangelis Koukis
            logging.error("Unexpected error:\n" + "".join(traceback.format_exception(*sys.exc_info())))
87 d08a5f6f Vangelis Koukis
            continue
88 d08a5f6f Vangelis Koukis
89 21f59a5d Vangelis Koukis
def main():
90 5db87ed5 Vangelis Koukis
    # Create an inproc PUB socket, for inter-thread communication
91 21f59a5d Vangelis Koukis
    zmqc = zmq.Context()
92 5db87ed5 Vangelis Koukis
    inproc = zmqc.socket(zmq.PUB)
93 5db87ed5 Vangelis Koukis
    inproc.bind("inproc://threads")
94 5db87ed5 Vangelis Koukis
95 5db87ed5 Vangelis Koukis
    # Create a SUB socket, connect to ganeti-0mqd and the inproc PUB socket
96 21f59a5d Vangelis Koukis
    subscriber = zmqc.socket(zmq.SUB)
97 5db87ed5 Vangelis Koukis
    subscriber.setsockopt(zmq.IDENTITY, platform.node() + "snf-db-controller")
98 21f59a5d Vangelis Koukis
    subscriber.setsockopt(zmq.SUBSCRIBE, "")
99 21f59a5d Vangelis Koukis
    subscriber.connect(GANETI_ZMQ_PUBLISHER)
100 5db87ed5 Vangelis Koukis
    subscriber.connect("inproc://threads")
101 21f59a5d Vangelis Koukis
102 5db87ed5 Vangelis Koukis
    # Use a separate thread to process incoming messages,
103 5db87ed5 Vangelis Koukis
    # needed because the Python runtime interacts badly with 0mq's blocking semantics.
104 5db87ed5 Vangelis Koukis
    zmqt = StoppableThread(target = zmq_sub_thread, args = (subscriber,))
105 21f59a5d Vangelis Koukis
    zmqt.start()
106 21f59a5d Vangelis Koukis
107 21f59a5d Vangelis Koukis
    try:
108 5db87ed5 Vangelis Koukis
        logging.info("in main thread.");
109 21f59a5d Vangelis Koukis
        while True:
110 5db87ed5 Vangelis Koukis
            logging.info("When I grow up, I'll be syncing with Ganeti at this point.")
111 5db87ed5 Vangelis Koukis
            time.sleep(600)
112 5db87ed5 Vangelis Koukis
    except:
113 5db87ed5 Vangelis Koukis
        logging.error("Caught exception:\n" + "".join(traceback.format_exception(*sys.exc_info())))
114 21f59a5d Vangelis Koukis
        
115 5db87ed5 Vangelis Koukis
        #
116 5db87ed5 Vangelis Koukis
        # Cleanup.
117 5db87ed5 Vangelis Koukis
        #
118 5db87ed5 Vangelis Koukis
        # Cancel the suscriber thread, wake it up, then join it.
119 5db87ed5 Vangelis Koukis
        zmqt.stop()
120 5db87ed5 Vangelis Koukis
        inproc.send_json({"type":"null"})
121 5db87ed5 Vangelis Koukis
        zmqt.join()
122 5db87ed5 Vangelis Koukis
123 5db87ed5 Vangelis Koukis
        return 1
124 5db87ed5 Vangelis Koukis
125 d08a5f6f Vangelis Koukis
if __name__ == "__main__":
126 d08a5f6f Vangelis Koukis
    logging.basicConfig(level=logging.DEBUG)
127 d08a5f6f Vangelis Koukis
    sys.exit(main())
128 d08a5f6f Vangelis Koukis
129 d08a5f6f Vangelis Koukis
# vim: set ts=4 sts=4 sw=4 et ai :