Statistics
| Branch: | Tag: | Revision:

root / db / db_controller.py @ dd53338a

History | View | Annotate | Download (4.3 kB)

1 d08a5f6f Vangelis Koukis
#!/usr/bin/env python
2 87ace70f Vassilios Karakoidas
#
3 d08a5f6f Vangelis Koukis
# Copyright (c) 2010 Greek Research and Technology Network
4 7bd50624 Vassilios Karakoidas
#
5 d08a5f6f Vangelis Koukis
"""Receive Ganeti events over 0mq, update VM state in DB.
6 87ace70f Vassilios Karakoidas

7 d08a5f6f Vangelis Koukis
This daemon receives job notifications from ganeti-0mqd
8 d08a5f6f Vangelis Koukis
and updates VM state in the DB accordingly.
9 fcbc5bb3 Vassilios Karakoidas

10 d08a5f6f Vangelis Koukis
"""
11 87ace70f Vassilios Karakoidas
12 d08a5f6f Vangelis Koukis
from django.core.management import setup_environ
13 c99fe4c7 Vassilios Karakoidas
14 d08a5f6f Vangelis Koukis
import sys
15 86221fd5 Panos Louridas
import os
16 86221fd5 Panos Louridas
path = os.path.normpath(os.path.join(os.getcwd(), '..'))
17 86221fd5 Panos Louridas
sys.path.append(path)
18 a5c17ad3 Dimitris Moraitis
import synnefo.settings as settings
19 d08a5f6f Vangelis Koukis
20 d08a5f6f Vangelis Koukis
setup_environ(settings)
21 d08a5f6f Vangelis Koukis
22 d08a5f6f Vangelis Koukis
import zmq
23 d08a5f6f Vangelis Koukis
import time
24 d08a5f6f Vangelis Koukis
import json
25 5db87ed5 Vangelis Koukis
import platform
26 d08a5f6f Vangelis Koukis
import logging
27 c58091a6 Vangelis Koukis
import getpass
28 d08a5f6f Vangelis Koukis
import traceback
29 d08a5f6f Vangelis Koukis
30 5db87ed5 Vangelis Koukis
from threading import Thread, Event, currentThread
31 d08a5f6f Vangelis Koukis
from synnefo.db.models import VirtualMachine
32 c7b808db Dimitris Moraitis
from synnefo.settings import GANETI_ZMQ_PUBLISHER
33 234f8b07 Vangelis Koukis
from synnefo.logic import utils, backend
34 c99fe4c7 Vassilios Karakoidas
35 5db87ed5 Vangelis Koukis
class StoppableThread(Thread):
36 c92af313 Vangelis Koukis
    """Thread class with a stop() method.
37 5db87ed5 Vangelis Koukis
    
38 5db87ed5 Vangelis Koukis
    The thread needs to check regularly for the stopped() condition.
39 5db87ed5 Vangelis Koukis
    When it does, it exits, so that another thread may .join() it.
40 5db87ed5 Vangelis Koukis

41 5db87ed5 Vangelis Koukis
    """
42 5db87ed5 Vangelis Koukis
43 5db87ed5 Vangelis Koukis
    def __init__(self, *args, **kwargs):
44 5db87ed5 Vangelis Koukis
        super(StoppableThread, self).__init__(*args, **kwargs)
45 5db87ed5 Vangelis Koukis
        self._stop = Event()
46 5db87ed5 Vangelis Koukis
47 5db87ed5 Vangelis Koukis
    def stop(self):
48 5db87ed5 Vangelis Koukis
        self._stop.set()
49 5db87ed5 Vangelis Koukis
50 5db87ed5 Vangelis Koukis
    def stopped(self):
51 5db87ed5 Vangelis Koukis
        return self._stop.isSet()
52 5db87ed5 Vangelis Koukis
53 5db87ed5 Vangelis Koukis
54 21f59a5d Vangelis Koukis
def zmq_sub_thread(subscriber):
55 be7b8d37 Vassilios Karakoidas
    while True:
56 21f59a5d Vangelis Koukis
        logging.debug("Entering 0mq to wait for message on SUB socket.")
57 d08a5f6f Vangelis Koukis
        data = subscriber.recv()
58 21f59a5d Vangelis Koukis
        logging.debug("Received message on 0mq SUB socket.")
59 d08a5f6f Vangelis Koukis
        try:
60 d08a5f6f Vangelis Koukis
            msg = json.loads(data)
61 d08a5f6f Vangelis Koukis
62 5db87ed5 Vangelis Koukis
            if currentThread().stopped():
63 5db87ed5 Vangelis Koukis
                logging.debug("Thread has been stopped, leaving request loop.")
64 5db87ed5 Vangelis Koukis
                return
65 5db87ed5 Vangelis Koukis
66 d08a5f6f Vangelis Koukis
            if msg["type"] != "ganeti-op-status":
67 e3a99a08 Vassilios Karakoidas
                logging.debug("Ignoring message of uknown type %s." % (msg["type"],))
68 d08a5f6f Vangelis Koukis
                continue
69 d08a5f6f Vangelis Koukis
70 e3a99a08 Vassilios Karakoidas
            vmid = utils.id_from_instance_name(msg["instance"])
71 d08a5f6f Vangelis Koukis
            vm = VirtualMachine.objects.get(id=vmid)
72 be7b8d37 Vassilios Karakoidas
    
73 e3a99a08 Vassilios Karakoidas
            logging.debug("Processing msg: %s" % (msg,))
74 234f8b07 Vangelis Koukis
            backend.process_backend_msg(vm, msg["jobId"], msg["operation"], msg["status"], msg["logmsg"])
75 d08a5f6f Vangelis Koukis
            logging.debug("Done processing msg for vm %s." % (msg["instance"]))
76 d08a5f6f Vangelis Koukis
77 d08a5f6f Vangelis Koukis
        except KeyError:
78 d08a5f6f Vangelis Koukis
            logging.error("Malformed incoming JSON, missing attributes: " + data)
79 d08a5f6f Vangelis Koukis
        except VirtualMachine.InvalidBackendIdError:
80 e3a99a08 Vassilios Karakoidas
            logging.debug("Ignoring msg for unknown instance %s." % (msg["instance"],))
81 d08a5f6f Vangelis Koukis
        except VirtualMachine.DoesNotExist:
82 d08a5f6f Vangelis Koukis
            logging.error("VM for instance %s with id %d not found in DB." % (msg["instance"], vmid))
83 d08a5f6f Vangelis Koukis
        except Exception as e:
84 d08a5f6f Vangelis Koukis
            logging.error("Unexpected error:\n" + "".join(traceback.format_exception(*sys.exc_info())))
85 d08a5f6f Vangelis Koukis
            continue
86 d08a5f6f Vangelis Koukis
87 21f59a5d Vangelis Koukis
def main():
88 5db87ed5 Vangelis Koukis
    # Create an inproc PUB socket, for inter-thread communication
89 21f59a5d Vangelis Koukis
    zmqc = zmq.Context()
90 5db87ed5 Vangelis Koukis
    inproc = zmqc.socket(zmq.PUB)
91 5db87ed5 Vangelis Koukis
    inproc.bind("inproc://threads")
92 5db87ed5 Vangelis Koukis
93 c58091a6 Vangelis Koukis
    #
94 5db87ed5 Vangelis Koukis
    # Create a SUB socket, connect to ganeti-0mqd and the inproc PUB socket
95 c58091a6 Vangelis Koukis
    #
96 21f59a5d Vangelis Koukis
    subscriber = zmqc.socket(zmq.SUB)
97 c58091a6 Vangelis Koukis
98 c58091a6 Vangelis Koukis
    # Combine the hostname, username and a constant string to get
99 c58091a6 Vangelis Koukis
    # a hopefully unique identity for this 0mq peer.
100 c58091a6 Vangelis Koukis
    # Reusing zmq.IDENTITY for two distinct peers triggers this 0mq bug:
101 c58091a6 Vangelis Koukis
    # https://github.com/zeromq/zeromq2/issues/30
102 c58091a6 Vangelis Koukis
    subscriber.setsockopt(zmq.IDENTITY, platform.node() + getpass.getuser() + "snf-db-controller")
103 21f59a5d Vangelis Koukis
    subscriber.setsockopt(zmq.SUBSCRIBE, "")
104 21f59a5d Vangelis Koukis
    subscriber.connect(GANETI_ZMQ_PUBLISHER)
105 5db87ed5 Vangelis Koukis
    subscriber.connect("inproc://threads")
106 21f59a5d Vangelis Koukis
107 5db87ed5 Vangelis Koukis
    # Use a separate thread to process incoming messages,
108 5db87ed5 Vangelis Koukis
    # needed because the Python runtime interacts badly with 0mq's blocking semantics.
109 5db87ed5 Vangelis Koukis
    zmqt = StoppableThread(target = zmq_sub_thread, args = (subscriber,))
110 21f59a5d Vangelis Koukis
    zmqt.start()
111 21f59a5d Vangelis Koukis
112 21f59a5d Vangelis Koukis
    try:
113 5db87ed5 Vangelis Koukis
        logging.info("in main thread.");
114 21f59a5d Vangelis Koukis
        while True:
115 5db87ed5 Vangelis Koukis
            logging.info("When I grow up, I'll be syncing with Ganeti at this point.")
116 5db87ed5 Vangelis Koukis
            time.sleep(600)
117 5db87ed5 Vangelis Koukis
    except:
118 5db87ed5 Vangelis Koukis
        logging.error("Caught exception:\n" + "".join(traceback.format_exception(*sys.exc_info())))
119 21f59a5d Vangelis Koukis
        
120 5db87ed5 Vangelis Koukis
        #
121 5db87ed5 Vangelis Koukis
        # Cleanup.
122 5db87ed5 Vangelis Koukis
        #
123 5db87ed5 Vangelis Koukis
        # Cancel the suscriber thread, wake it up, then join it.
124 5db87ed5 Vangelis Koukis
        zmqt.stop()
125 5db87ed5 Vangelis Koukis
        inproc.send_json({"type":"null"})
126 5db87ed5 Vangelis Koukis
        zmqt.join()
127 5db87ed5 Vangelis Koukis
128 5db87ed5 Vangelis Koukis
        return 1
129 5db87ed5 Vangelis Koukis
130 d08a5f6f Vangelis Koukis
if __name__ == "__main__":
131 d08a5f6f Vangelis Koukis
    logging.basicConfig(level=logging.DEBUG)
132 d08a5f6f Vangelis Koukis
    sys.exit(main())
133 d08a5f6f Vangelis Koukis
134 d08a5f6f Vangelis Koukis
# vim: set ts=4 sts=4 sw=4 et ai :