Revision da102335 db/db_controller.py

b/db/db_controller.py
2 2
#
3 3
# Copyright (c) 2010 Greek Research and Technology Network
4 4
#
5
"""Receive Ganeti events over 0mq, update VM state in DB.
5
"""Receive Ganeti events over RabbitMQ, update VM state in DB.
6 6

  
7
This daemon receives job notifications from ganeti-0mqd
7
This daemon receives job notifications from ganeti-amqpd
8 8
and updates VM state in the DB accordingly.
9 9

  
10 10
"""
......
19 19

  
20 20
setup_environ(settings)
21 21

  
22
import zmq
23
import time
22
from amqplib import client_0_8 as amqp
23

  
24 24
import json
25
import platform
26 25
import logging
27
import getpass
28 26
import traceback
29 27

  
30
from threading import Thread, Event, currentThread
31 28
from synnefo.db.models import VirtualMachine
32
from synnefo.settings import GANETI_MASTER_IP, GANETI_0MQD_PUB_PORT
33 29
from synnefo.logic import utils, backend
34 30

  
35
class StoppableThread(Thread):
36
    """Thread class with a stop() method.
37
    
38
    The thread needs to check regularly for the stopped() condition.
39
    When it does, it exits, so that another thread may .join() it.
40

  
41
    """
42

  
43
    def __init__(self, *args, **kwargs):
44
        Thread.__init__(self, *args, **kwargs)
45
        self._stop = Event()
46

  
47
    def stop(self):
48
        self._stop.set()
31
def update_db(message):
32
    logging.debug("Received message from RabbitMQ")
33
    try:
34
        msg = json.loads(message.body)
35

  
36
        if msg["type"] != "ganeti-op-status":
37
            logging.debug("Ignoring message of uknown type %s." % (msg["type"],))
38
            return
39

  
40
        vmid = utils.id_from_instance_name(msg["instance"])
41
        vm = VirtualMachine.objects.get(id=vmid)
42

  
43
        logging.debug("Processing msg: %s" % (msg,))
44
        backend.process_backend_msg(vm, msg["jobId"], msg["operation"], msg["status"], msg["logmsg"])
45
        logging.debug("Done processing msg for vm %s." % (msg["instance"]))
46

  
47
    except KeyError:
48
        logging.error("Malformed incoming JSON, missing attributes: " + message_data)
49
    except VirtualMachine.InvalidBackendIdError:
50
        logging.debug("Ignoring msg for unknown instance %s." % (msg["instance"],))
51
    except VirtualMachine.DoesNotExist:
52
        logging.error("VM for instance %s with id %d not found in DB." % (msg["instance"], vmid))
53
    except Exception as e:
54
        logging.error("Unexpected error:\n" + "".join(traceback.format_exception(*sys.exc_info())))
55
        return
56
    finally:
57
        message.channel.basic_ack(message.delivery_tag)
49 58

  
50
    def stopped(self):
51
        return self._stop.isSet()
59
def main():
52 60

  
61
    conn = amqp.Connection( host=settings.RABBIT_HOST,
62
                            userid=settings.RABBIT_USERNAME,
63
                            password=settings.RABBIT_PASSWORD,
64
                            virtual_host=settings.RABBIT_VHOST)
65
    chan = conn.channel()
66
    chan.queue_declare(queue="events", durable=True, exclusive=False, auto_delete=False)
67
    chan.exchange_declare(exchange="ganeti", type="direct", durable=True,
68
            auto_delete=False)
69
    chan.queue_bind(queue="events", exchange="ganeti", routing_key="eventd")
70
    chan.basic_consume(queue="events", callback=update_db, consumer_tag="dbupdater")
53 71

  
54
def zmq_sub_thread(subscriber):
55
    logging.error("Entering 0mq to wait for message on SUB socket.")
56 72
    while True:
57
        logging.debug("Entering 0mq to wait for message on SUB socket.")
58
        data = subscriber.recv()
59
        logging.debug("Received message on 0mq SUB socket.")
60
        try:
61
            msg = json.loads(data)
62

  
63
            if currentThread().stopped():
64
                logging.debug("Thread has been stopped, leaving request loop.")
65
                return
66

  
67
            if msg["type"] != "ganeti-op-status":
68
                logging.debug("Ignoring message of uknown type %s." % (msg["type"],))
69
                continue
70

  
71
            vmid = utils.id_from_instance_name(msg["instance"])
72
            vm = VirtualMachine.objects.get(id=vmid)
73
    
74
            logging.debug("Processing msg: %s" % (msg,))
75
            backend.process_backend_msg(vm, msg["jobId"], msg["operation"], msg["status"], msg["logmsg"])
76
            logging.debug("Done processing msg for vm %s." % (msg["instance"]))
77

  
78
        except KeyError:
79
            logging.error("Malformed incoming JSON, missing attributes: " + data)
80
        except VirtualMachine.InvalidBackendIdError:
81
            logging.debug("Ignoring msg for unknown instance %s." % (msg["instance"],))
82
        except VirtualMachine.DoesNotExist:
83
            logging.error("VM for instance %s with id %d not found in DB." % (msg["instance"], vmid))
84
        except Exception as e:
85
            logging.error("Unexpected error:\n" + "".join(traceback.format_exception(*sys.exc_info())))
86
            continue
87

  
88
def main():
89
    # Create an inproc PUB socket, for inter-thread communication
90
    zmqc = zmq.Context()
91
    inproc = zmqc.socket(zmq.PUB)
92
    inproc.bind("inproc://threads")
93

  
94
    #
95
    # Create a SUB socket, connect to ganeti-0mqd and the inproc PUB socket
96
    #
97
    subscriber = zmqc.socket(zmq.SUB)
98

  
99
    # Combine the hostname, username and a constant string to get
100
    # a hopefully unique identity for this 0mq peer.
101
    # Reusing zmq.IDENTITY for two distinct peers triggers this 0mq bug:
102
    # https://github.com/zeromq/zeromq2/issues/30
103
    GANETI_ZMQ_PUBLISHER = "tcp://%s:%d" % (GANETI_MASTER_IP, int(GANETI_0MQD_PUB_PORT))
104
    subscriber.setsockopt(zmq.IDENTITY, platform.node() + getpass.getuser() + "snf-db-controller")
105
    subscriber.setsockopt(zmq.SUBSCRIBE, "")
106
    subscriber.connect(GANETI_ZMQ_PUBLISHER)
107
    subscriber.connect("inproc://threads")
108

  
109
    # Use a separate thread to process incoming messages,
110
    # needed because the Python runtime interacts badly with 0mq's blocking semantics.
111
    zmqt = StoppableThread(target = zmq_sub_thread, args = (subscriber,))
112
    zmqt.start()
73
        chan.wait()
113 74

  
114
    try:
115
        logging.info("in main thread.");
116
        while True:
117
            logging.info("When I grow up, I'll be syncing with Ganeti at this point.")
118
            time.sleep(600)
119
    except:
120
        logging.error("Caught exception:\n" + "".join(traceback.format_exception(*sys.exc_info())))
121
        
122
        #
123
        # Cleanup.
124
        #
125
        # Cancel the suscriber thread, wake it up, then join it.
126
        zmqt.stop()
127
        inproc.send_json({"type":"null"})
128
        zmqt.join()
129

  
130
        return 1
75
    chan.basic_cancel("dbupdater")
76
    chan.close()
77
    conn.close()
78
#TODO: Implement proper shutdown of channel
131 79

  
132 80
if __name__ == "__main__":
133 81
    logging.basicConfig(level=logging.DEBUG)

Also available in: Unified diff