Statistics
| Branch: | Tag: | Revision:

root / db / db_controller.py @ dfd19c2d

History | View | Annotate | Download (4.3 kB)

1 d08a5f6f Vangelis Koukis
#!/usr/bin/env python
2 87ace70f Vassilios Karakoidas
#
3 d08a5f6f Vangelis Koukis
# Copyright (c) 2010 Greek Research and Technology Network
4 7bd50624 Vassilios Karakoidas
#
5 d08a5f6f Vangelis Koukis
"""Receive Ganeti events over 0mq, update VM state in DB.
6 87ace70f Vassilios Karakoidas

7 d08a5f6f Vangelis Koukis
This daemon receives job notifications from ganeti-0mqd
8 d08a5f6f Vangelis Koukis
and updates VM state in the DB accordingly.
9 fcbc5bb3 Vassilios Karakoidas

10 d08a5f6f Vangelis Koukis
"""
11 87ace70f Vassilios Karakoidas
12 d08a5f6f Vangelis Koukis
from django.core.management import setup_environ
13 c99fe4c7 Vassilios Karakoidas
14 d08a5f6f Vangelis Koukis
import sys
15 86221fd5 Panos Louridas
import os
16 86221fd5 Panos Louridas
path = os.path.normpath(os.path.join(os.getcwd(), '..'))
17 86221fd5 Panos Louridas
sys.path.append(path)
18 a5c17ad3 Dimitris Moraitis
import synnefo.settings as settings
19 d08a5f6f Vangelis Koukis
20 d08a5f6f Vangelis Koukis
setup_environ(settings)
21 d08a5f6f Vangelis Koukis
22 d08a5f6f Vangelis Koukis
import zmq
23 d08a5f6f Vangelis Koukis
import time
24 d08a5f6f Vangelis Koukis
import json
25 5db87ed5 Vangelis Koukis
import platform
26 d08a5f6f Vangelis Koukis
import logging
27 c58091a6 Vangelis Koukis
import getpass
28 d08a5f6f Vangelis Koukis
import traceback
29 d08a5f6f Vangelis Koukis
30 5db87ed5 Vangelis Koukis
from threading import Thread, Event, currentThread
31 21f59a5d Vangelis Koukis
32 d08a5f6f Vangelis Koukis
from synnefo.db.models import VirtualMachine
33 02feca11 Vassilios Karakoidas
34 c7b808db Dimitris Moraitis
from synnefo.settings import GANETI_ZMQ_PUBLISHER
35 d08a5f6f Vangelis Koukis
36 c63e332f Vassilios Karakoidas
from logic import utils, backend
37 c99fe4c7 Vassilios Karakoidas
38 5db87ed5 Vangelis Koukis
class StoppableThread(Thread):
39 c92af313 Vangelis Koukis
    """Thread class with a stop() method.
40 5db87ed5 Vangelis Koukis
    
41 5db87ed5 Vangelis Koukis
    The thread needs to check regularly for the stopped() condition.
42 5db87ed5 Vangelis Koukis
    When it does, it exits, so that another thread may .join() it.
43 5db87ed5 Vangelis Koukis

44 5db87ed5 Vangelis Koukis
    """
45 5db87ed5 Vangelis Koukis
46 5db87ed5 Vangelis Koukis
    def __init__(self, *args, **kwargs):
47 5db87ed5 Vangelis Koukis
        super(StoppableThread, self).__init__(*args, **kwargs)
48 5db87ed5 Vangelis Koukis
        self._stop = Event()
49 5db87ed5 Vangelis Koukis
50 5db87ed5 Vangelis Koukis
    def stop(self):
51 5db87ed5 Vangelis Koukis
        self._stop.set()
52 5db87ed5 Vangelis Koukis
53 5db87ed5 Vangelis Koukis
    def stopped(self):
54 5db87ed5 Vangelis Koukis
        return self._stop.isSet()
55 5db87ed5 Vangelis Koukis
56 5db87ed5 Vangelis Koukis
57 21f59a5d Vangelis Koukis
def zmq_sub_thread(subscriber):
58 be7b8d37 Vassilios Karakoidas
    while True:
59 21f59a5d Vangelis Koukis
        logging.debug("Entering 0mq to wait for message on SUB socket.")
60 d08a5f6f Vangelis Koukis
        data = subscriber.recv()
61 21f59a5d Vangelis Koukis
        logging.debug("Received message on 0mq SUB socket.")
62 d08a5f6f Vangelis Koukis
        try:
63 d08a5f6f Vangelis Koukis
            msg = json.loads(data)
64 d08a5f6f Vangelis Koukis
65 5db87ed5 Vangelis Koukis
            if currentThread().stopped():
66 5db87ed5 Vangelis Koukis
                logging.debug("Thread has been stopped, leaving request loop.")
67 5db87ed5 Vangelis Koukis
                return
68 5db87ed5 Vangelis Koukis
69 d08a5f6f Vangelis Koukis
            if msg["type"] != "ganeti-op-status":
70 e3a99a08 Vassilios Karakoidas
                logging.debug("Ignoring message of uknown type %s." % (msg["type"],))
71 d08a5f6f Vangelis Koukis
                continue
72 d08a5f6f Vangelis Koukis
73 e3a99a08 Vassilios Karakoidas
            vmid = utils.id_from_instance_name(msg["instance"])
74 d08a5f6f Vangelis Koukis
            vm = VirtualMachine.objects.get(id=vmid)
75 be7b8d37 Vassilios Karakoidas
    
76 e3a99a08 Vassilios Karakoidas
            logging.debug("Processing msg: %s" % (msg,))
77 02feca11 Vassilios Karakoidas
            backend.process_backend_msg(msg["jobId"], msg["operation"], msg["status"], msg["logmsg"])
78 d08a5f6f Vangelis Koukis
            vm.save()
79 d08a5f6f Vangelis Koukis
            logging.debug("Done processing msg for vm %s." % (msg["instance"]))
80 d08a5f6f Vangelis Koukis
81 d08a5f6f Vangelis Koukis
        except KeyError:
82 d08a5f6f Vangelis Koukis
            logging.error("Malformed incoming JSON, missing attributes: " + data)
83 d08a5f6f Vangelis Koukis
        except VirtualMachine.InvalidBackendIdError:
84 e3a99a08 Vassilios Karakoidas
            logging.debug("Ignoring msg for unknown instance %s." % (msg["instance"],))
85 d08a5f6f Vangelis Koukis
        except VirtualMachine.DoesNotExist:
86 d08a5f6f Vangelis Koukis
            logging.error("VM for instance %s with id %d not found in DB." % (msg["instance"], vmid))
87 d08a5f6f Vangelis Koukis
        except Exception as e:
88 d08a5f6f Vangelis Koukis
            logging.error("Unexpected error:\n" + "".join(traceback.format_exception(*sys.exc_info())))
89 d08a5f6f Vangelis Koukis
            continue
90 d08a5f6f Vangelis Koukis
91 21f59a5d Vangelis Koukis
def main():
92 5db87ed5 Vangelis Koukis
    # Create an inproc PUB socket, for inter-thread communication
93 21f59a5d Vangelis Koukis
    zmqc = zmq.Context()
94 5db87ed5 Vangelis Koukis
    inproc = zmqc.socket(zmq.PUB)
95 5db87ed5 Vangelis Koukis
    inproc.bind("inproc://threads")
96 5db87ed5 Vangelis Koukis
97 c58091a6 Vangelis Koukis
    #
98 5db87ed5 Vangelis Koukis
    # Create a SUB socket, connect to ganeti-0mqd and the inproc PUB socket
99 c58091a6 Vangelis Koukis
    #
100 21f59a5d Vangelis Koukis
    subscriber = zmqc.socket(zmq.SUB)
101 c58091a6 Vangelis Koukis
102 c58091a6 Vangelis Koukis
    # Combine the hostname, username and a constant string to get
103 c58091a6 Vangelis Koukis
    # a hopefully unique identity for this 0mq peer.
104 c58091a6 Vangelis Koukis
    # Reusing zmq.IDENTITY for two distinct peers triggers this 0mq bug:
105 c58091a6 Vangelis Koukis
    # https://github.com/zeromq/zeromq2/issues/30
106 c58091a6 Vangelis Koukis
    subscriber.setsockopt(zmq.IDENTITY, platform.node() + getpass.getuser() + "snf-db-controller")
107 21f59a5d Vangelis Koukis
    subscriber.setsockopt(zmq.SUBSCRIBE, "")
108 21f59a5d Vangelis Koukis
    subscriber.connect(GANETI_ZMQ_PUBLISHER)
109 5db87ed5 Vangelis Koukis
    subscriber.connect("inproc://threads")
110 21f59a5d Vangelis Koukis
111 5db87ed5 Vangelis Koukis
    # Use a separate thread to process incoming messages,
112 5db87ed5 Vangelis Koukis
    # needed because the Python runtime interacts badly with 0mq's blocking semantics.
113 5db87ed5 Vangelis Koukis
    zmqt = StoppableThread(target = zmq_sub_thread, args = (subscriber,))
114 21f59a5d Vangelis Koukis
    zmqt.start()
115 21f59a5d Vangelis Koukis
116 21f59a5d Vangelis Koukis
    try:
117 5db87ed5 Vangelis Koukis
        logging.info("in main thread.");
118 21f59a5d Vangelis Koukis
        while True:
119 5db87ed5 Vangelis Koukis
            logging.info("When I grow up, I'll be syncing with Ganeti at this point.")
120 5db87ed5 Vangelis Koukis
            time.sleep(600)
121 5db87ed5 Vangelis Koukis
    except:
122 5db87ed5 Vangelis Koukis
        logging.error("Caught exception:\n" + "".join(traceback.format_exception(*sys.exc_info())))
123 21f59a5d Vangelis Koukis
        
124 5db87ed5 Vangelis Koukis
        #
125 5db87ed5 Vangelis Koukis
        # Cleanup.
126 5db87ed5 Vangelis Koukis
        #
127 5db87ed5 Vangelis Koukis
        # Cancel the suscriber thread, wake it up, then join it.
128 5db87ed5 Vangelis Koukis
        zmqt.stop()
129 5db87ed5 Vangelis Koukis
        inproc.send_json({"type":"null"})
130 5db87ed5 Vangelis Koukis
        zmqt.join()
131 5db87ed5 Vangelis Koukis
132 5db87ed5 Vangelis Koukis
        return 1
133 5db87ed5 Vangelis Koukis
134 d08a5f6f Vangelis Koukis
if __name__ == "__main__":
135 d08a5f6f Vangelis Koukis
    logging.basicConfig(level=logging.DEBUG)
136 d08a5f6f Vangelis Koukis
    sys.exit(main())
137 d08a5f6f Vangelis Koukis
138 d08a5f6f Vangelis Koukis
# vim: set ts=4 sts=4 sw=4 et ai :