root / db / db_controller.py @ 432fc8c3
History | View | Annotate | Download (4.3 kB)
1 | d08a5f6f | Vangelis Koukis | #!/usr/bin/env python
|
---|---|---|---|
2 | 87ace70f | Vassilios Karakoidas | #
|
3 | d08a5f6f | Vangelis Koukis | # Copyright (c) 2010 Greek Research and Technology Network
|
4 | 7bd50624 | Vassilios Karakoidas | #
|
5 | d08a5f6f | Vangelis Koukis | """Receive Ganeti events over 0mq, update VM state in DB.
|
6 | 87ace70f | Vassilios Karakoidas |
|
7 | d08a5f6f | Vangelis Koukis | This daemon receives job notifications from ganeti-0mqd
|
8 | d08a5f6f | Vangelis Koukis | and updates VM state in the DB accordingly.
|
9 | fcbc5bb3 | Vassilios Karakoidas |
|
10 | d08a5f6f | Vangelis Koukis | """
|
11 | 87ace70f | Vassilios Karakoidas | |
12 | d08a5f6f | Vangelis Koukis | from django.core.management import setup_environ |
13 | c99fe4c7 | Vassilios Karakoidas | |
14 | d08a5f6f | Vangelis Koukis | import sys |
15 | 86221fd5 | Panos Louridas | import os |
16 | 86221fd5 | Panos Louridas | path = os.path.normpath(os.path.join(os.getcwd(), '..'))
|
17 | 86221fd5 | Panos Louridas | sys.path.append(path) |
18 | a5c17ad3 | Dimitris Moraitis | import synnefo.settings as settings |
19 | d08a5f6f | Vangelis Koukis | |
20 | d08a5f6f | Vangelis Koukis | setup_environ(settings) |
21 | d08a5f6f | Vangelis Koukis | |
22 | d08a5f6f | Vangelis Koukis | import zmq |
23 | d08a5f6f | Vangelis Koukis | import time |
24 | d08a5f6f | Vangelis Koukis | import json |
25 | 5db87ed5 | Vangelis Koukis | import platform |
26 | d08a5f6f | Vangelis Koukis | import logging |
27 | c58091a6 | Vangelis Koukis | import getpass |
28 | d08a5f6f | Vangelis Koukis | import traceback |
29 | d08a5f6f | Vangelis Koukis | |
30 | 5db87ed5 | Vangelis Koukis | from threading import Thread, Event, currentThread |
31 | d08a5f6f | Vangelis Koukis | from synnefo.db.models import VirtualMachine |
32 | c7b808db | Dimitris Moraitis | from synnefo.settings import GANETI_ZMQ_PUBLISHER |
33 | 234f8b07 | Vangelis Koukis | from synnefo.logic import utils, backend |
34 | c99fe4c7 | Vassilios Karakoidas | |
35 | 5db87ed5 | Vangelis Koukis | class StoppableThread(Thread): |
36 | c92af313 | Vangelis Koukis | """Thread class with a stop() method.
|
37 | 5db87ed5 | Vangelis Koukis |
|
38 | 5db87ed5 | Vangelis Koukis | The thread needs to check regularly for the stopped() condition.
|
39 | 5db87ed5 | Vangelis Koukis | When it does, it exits, so that another thread may .join() it.
|
40 | 5db87ed5 | Vangelis Koukis |
|
41 | 5db87ed5 | Vangelis Koukis | """
|
42 | 5db87ed5 | Vangelis Koukis | |
43 | 5db87ed5 | Vangelis Koukis | def __init__(self, *args, **kwargs): |
44 | 5db87ed5 | Vangelis Koukis | super(StoppableThread, self).__init__(*args, **kwargs) |
45 | 5db87ed5 | Vangelis Koukis | self._stop = Event()
|
46 | 5db87ed5 | Vangelis Koukis | |
47 | 5db87ed5 | Vangelis Koukis | def stop(self): |
48 | 5db87ed5 | Vangelis Koukis | self._stop.set()
|
49 | 5db87ed5 | Vangelis Koukis | |
50 | 5db87ed5 | Vangelis Koukis | def stopped(self): |
51 | 5db87ed5 | Vangelis Koukis | return self._stop.isSet() |
52 | 5db87ed5 | Vangelis Koukis | |
53 | 5db87ed5 | Vangelis Koukis | |
54 | 21f59a5d | Vangelis Koukis | def zmq_sub_thread(subscriber): |
55 | be7b8d37 | Vassilios Karakoidas | while True: |
56 | 21f59a5d | Vangelis Koukis | logging.debug("Entering 0mq to wait for message on SUB socket.")
|
57 | d08a5f6f | Vangelis Koukis | data = subscriber.recv() |
58 | 21f59a5d | Vangelis Koukis | logging.debug("Received message on 0mq SUB socket.")
|
59 | d08a5f6f | Vangelis Koukis | try:
|
60 | d08a5f6f | Vangelis Koukis | msg = json.loads(data) |
61 | d08a5f6f | Vangelis Koukis | |
62 | 5db87ed5 | Vangelis Koukis | if currentThread().stopped():
|
63 | 5db87ed5 | Vangelis Koukis | logging.debug("Thread has been stopped, leaving request loop.")
|
64 | 5db87ed5 | Vangelis Koukis | return
|
65 | 5db87ed5 | Vangelis Koukis | |
66 | d08a5f6f | Vangelis Koukis | if msg["type"] != "ganeti-op-status": |
67 | e3a99a08 | Vassilios Karakoidas | logging.debug("Ignoring message of uknown type %s." % (msg["type"],)) |
68 | d08a5f6f | Vangelis Koukis | continue
|
69 | d08a5f6f | Vangelis Koukis | |
70 | e3a99a08 | Vassilios Karakoidas | vmid = utils.id_from_instance_name(msg["instance"])
|
71 | d08a5f6f | Vangelis Koukis | vm = VirtualMachine.objects.get(id=vmid) |
72 | be7b8d37 | Vassilios Karakoidas | |
73 | e3a99a08 | Vassilios Karakoidas | logging.debug("Processing msg: %s" % (msg,))
|
74 | 234f8b07 | Vangelis Koukis | backend.process_backend_msg(vm, msg["jobId"], msg["operation"], msg["status"], msg["logmsg"]) |
75 | d08a5f6f | Vangelis Koukis | logging.debug("Done processing msg for vm %s." % (msg["instance"])) |
76 | d08a5f6f | Vangelis Koukis | |
77 | d08a5f6f | Vangelis Koukis | except KeyError: |
78 | d08a5f6f | Vangelis Koukis | logging.error("Malformed incoming JSON, missing attributes: " + data)
|
79 | d08a5f6f | Vangelis Koukis | except VirtualMachine.InvalidBackendIdError:
|
80 | e3a99a08 | Vassilios Karakoidas | logging.debug("Ignoring msg for unknown instance %s." % (msg["instance"],)) |
81 | d08a5f6f | Vangelis Koukis | except VirtualMachine.DoesNotExist:
|
82 | d08a5f6f | Vangelis Koukis | logging.error("VM for instance %s with id %d not found in DB." % (msg["instance"], vmid)) |
83 | d08a5f6f | Vangelis Koukis | except Exception as e: |
84 | d08a5f6f | Vangelis Koukis | logging.error("Unexpected error:\n" + "".join(traceback.format_exception(*sys.exc_info()))) |
85 | d08a5f6f | Vangelis Koukis | continue
|
86 | d08a5f6f | Vangelis Koukis | |
87 | 21f59a5d | Vangelis Koukis | def main(): |
88 | 5db87ed5 | Vangelis Koukis | # Create an inproc PUB socket, for inter-thread communication
|
89 | 21f59a5d | Vangelis Koukis | zmqc = zmq.Context() |
90 | 5db87ed5 | Vangelis Koukis | inproc = zmqc.socket(zmq.PUB) |
91 | 5db87ed5 | Vangelis Koukis | inproc.bind("inproc://threads")
|
92 | 5db87ed5 | Vangelis Koukis | |
93 | c58091a6 | Vangelis Koukis | #
|
94 | 5db87ed5 | Vangelis Koukis | # Create a SUB socket, connect to ganeti-0mqd and the inproc PUB socket
|
95 | c58091a6 | Vangelis Koukis | #
|
96 | 21f59a5d | Vangelis Koukis | subscriber = zmqc.socket(zmq.SUB) |
97 | c58091a6 | Vangelis Koukis | |
98 | c58091a6 | Vangelis Koukis | # Combine the hostname, username and a constant string to get
|
99 | c58091a6 | Vangelis Koukis | # a hopefully unique identity for this 0mq peer.
|
100 | c58091a6 | Vangelis Koukis | # Reusing zmq.IDENTITY for two distinct peers triggers this 0mq bug:
|
101 | c58091a6 | Vangelis Koukis | # https://github.com/zeromq/zeromq2/issues/30
|
102 | c58091a6 | Vangelis Koukis | subscriber.setsockopt(zmq.IDENTITY, platform.node() + getpass.getuser() + "snf-db-controller")
|
103 | 21f59a5d | Vangelis Koukis | subscriber.setsockopt(zmq.SUBSCRIBE, "")
|
104 | 21f59a5d | Vangelis Koukis | subscriber.connect(GANETI_ZMQ_PUBLISHER) |
105 | 5db87ed5 | Vangelis Koukis | subscriber.connect("inproc://threads")
|
106 | 21f59a5d | Vangelis Koukis | |
107 | 5db87ed5 | Vangelis Koukis | # Use a separate thread to process incoming messages,
|
108 | 5db87ed5 | Vangelis Koukis | # needed because the Python runtime interacts badly with 0mq's blocking semantics.
|
109 | 5db87ed5 | Vangelis Koukis | zmqt = StoppableThread(target = zmq_sub_thread, args = (subscriber,)) |
110 | 21f59a5d | Vangelis Koukis | zmqt.start() |
111 | 21f59a5d | Vangelis Koukis | |
112 | 21f59a5d | Vangelis Koukis | try:
|
113 | 5db87ed5 | Vangelis Koukis | logging.info("in main thread.");
|
114 | 21f59a5d | Vangelis Koukis | while True: |
115 | 5db87ed5 | Vangelis Koukis | logging.info("When I grow up, I'll be syncing with Ganeti at this point.")
|
116 | 5db87ed5 | Vangelis Koukis | time.sleep(600)
|
117 | 5db87ed5 | Vangelis Koukis | except:
|
118 | 5db87ed5 | Vangelis Koukis | logging.error("Caught exception:\n" + "".join(traceback.format_exception(*sys.exc_info()))) |
119 | 21f59a5d | Vangelis Koukis | |
120 | 5db87ed5 | Vangelis Koukis | #
|
121 | 5db87ed5 | Vangelis Koukis | # Cleanup.
|
122 | 5db87ed5 | Vangelis Koukis | #
|
123 | 5db87ed5 | Vangelis Koukis | # Cancel the suscriber thread, wake it up, then join it.
|
124 | 5db87ed5 | Vangelis Koukis | zmqt.stop() |
125 | 5db87ed5 | Vangelis Koukis | inproc.send_json({"type":"null"}) |
126 | 5db87ed5 | Vangelis Koukis | zmqt.join() |
127 | 5db87ed5 | Vangelis Koukis | |
128 | 5db87ed5 | Vangelis Koukis | return 1 |
129 | 5db87ed5 | Vangelis Koukis | |
130 | d08a5f6f | Vangelis Koukis | if __name__ == "__main__": |
131 | d08a5f6f | Vangelis Koukis | logging.basicConfig(level=logging.DEBUG) |
132 | d08a5f6f | Vangelis Koukis | sys.exit(main()) |
133 | d08a5f6f | Vangelis Koukis | |
134 | d08a5f6f | Vangelis Koukis | # vim: set ts=4 sts=4 sw=4 et ai : |