root / db / db_controller.py @ dfd19c2d
History | View | Annotate | Download (4.3 kB)
1 | d08a5f6f | Vangelis Koukis | #!/usr/bin/env python
|
---|---|---|---|
2 | 87ace70f | Vassilios Karakoidas | #
|
3 | d08a5f6f | Vangelis Koukis | # Copyright (c) 2010 Greek Research and Technology Network
|
4 | 7bd50624 | Vassilios Karakoidas | #
|
5 | d08a5f6f | Vangelis Koukis | """Receive Ganeti events over 0mq, update VM state in DB.
|
6 | 87ace70f | Vassilios Karakoidas |
|
7 | d08a5f6f | Vangelis Koukis | This daemon receives job notifications from ganeti-0mqd
|
8 | d08a5f6f | Vangelis Koukis | and updates VM state in the DB accordingly.
|
9 | fcbc5bb3 | Vassilios Karakoidas |
|
10 | d08a5f6f | Vangelis Koukis | """
|
11 | 87ace70f | Vassilios Karakoidas | |
12 | d08a5f6f | Vangelis Koukis | from django.core.management import setup_environ |
13 | c99fe4c7 | Vassilios Karakoidas | |
14 | d08a5f6f | Vangelis Koukis | import sys |
15 | 86221fd5 | Panos Louridas | import os |
16 | 86221fd5 | Panos Louridas | path = os.path.normpath(os.path.join(os.getcwd(), '..'))
|
17 | 86221fd5 | Panos Louridas | sys.path.append(path) |
18 | a5c17ad3 | Dimitris Moraitis | import synnefo.settings as settings |
19 | d08a5f6f | Vangelis Koukis | |
20 | d08a5f6f | Vangelis Koukis | setup_environ(settings) |
21 | d08a5f6f | Vangelis Koukis | |
22 | d08a5f6f | Vangelis Koukis | import zmq |
23 | d08a5f6f | Vangelis Koukis | import time |
24 | d08a5f6f | Vangelis Koukis | import json |
25 | 5db87ed5 | Vangelis Koukis | import platform |
26 | d08a5f6f | Vangelis Koukis | import logging |
27 | c58091a6 | Vangelis Koukis | import getpass |
28 | d08a5f6f | Vangelis Koukis | import traceback |
29 | d08a5f6f | Vangelis Koukis | |
30 | 5db87ed5 | Vangelis Koukis | from threading import Thread, Event, currentThread |
31 | 21f59a5d | Vangelis Koukis | |
32 | d08a5f6f | Vangelis Koukis | from synnefo.db.models import VirtualMachine |
33 | 02feca11 | Vassilios Karakoidas | |
34 | c7b808db | Dimitris Moraitis | from synnefo.settings import GANETI_ZMQ_PUBLISHER |
35 | d08a5f6f | Vangelis Koukis | |
36 | c63e332f | Vassilios Karakoidas | from logic import utils, backend |
37 | c99fe4c7 | Vassilios Karakoidas | |
38 | 5db87ed5 | Vangelis Koukis | class StoppableThread(Thread): |
39 | c92af313 | Vangelis Koukis | """Thread class with a stop() method.
|
40 | 5db87ed5 | Vangelis Koukis |
|
41 | 5db87ed5 | Vangelis Koukis | The thread needs to check regularly for the stopped() condition.
|
42 | 5db87ed5 | Vangelis Koukis | When it does, it exits, so that another thread may .join() it.
|
43 | 5db87ed5 | Vangelis Koukis |
|
44 | 5db87ed5 | Vangelis Koukis | """
|
45 | 5db87ed5 | Vangelis Koukis | |
46 | 5db87ed5 | Vangelis Koukis | def __init__(self, *args, **kwargs): |
47 | 5db87ed5 | Vangelis Koukis | super(StoppableThread, self).__init__(*args, **kwargs) |
48 | 5db87ed5 | Vangelis Koukis | self._stop = Event()
|
49 | 5db87ed5 | Vangelis Koukis | |
50 | 5db87ed5 | Vangelis Koukis | def stop(self): |
51 | 5db87ed5 | Vangelis Koukis | self._stop.set()
|
52 | 5db87ed5 | Vangelis Koukis | |
53 | 5db87ed5 | Vangelis Koukis | def stopped(self): |
54 | 5db87ed5 | Vangelis Koukis | return self._stop.isSet() |
55 | 5db87ed5 | Vangelis Koukis | |
56 | 5db87ed5 | Vangelis Koukis | |
57 | 21f59a5d | Vangelis Koukis | def zmq_sub_thread(subscriber): |
58 | be7b8d37 | Vassilios Karakoidas | while True: |
59 | 21f59a5d | Vangelis Koukis | logging.debug("Entering 0mq to wait for message on SUB socket.")
|
60 | d08a5f6f | Vangelis Koukis | data = subscriber.recv() |
61 | 21f59a5d | Vangelis Koukis | logging.debug("Received message on 0mq SUB socket.")
|
62 | d08a5f6f | Vangelis Koukis | try:
|
63 | d08a5f6f | Vangelis Koukis | msg = json.loads(data) |
64 | d08a5f6f | Vangelis Koukis | |
65 | 5db87ed5 | Vangelis Koukis | if currentThread().stopped():
|
66 | 5db87ed5 | Vangelis Koukis | logging.debug("Thread has been stopped, leaving request loop.")
|
67 | 5db87ed5 | Vangelis Koukis | return
|
68 | 5db87ed5 | Vangelis Koukis | |
69 | d08a5f6f | Vangelis Koukis | if msg["type"] != "ganeti-op-status": |
70 | e3a99a08 | Vassilios Karakoidas | logging.debug("Ignoring message of uknown type %s." % (msg["type"],)) |
71 | d08a5f6f | Vangelis Koukis | continue
|
72 | d08a5f6f | Vangelis Koukis | |
73 | e3a99a08 | Vassilios Karakoidas | vmid = utils.id_from_instance_name(msg["instance"])
|
74 | d08a5f6f | Vangelis Koukis | vm = VirtualMachine.objects.get(id=vmid) |
75 | be7b8d37 | Vassilios Karakoidas | |
76 | e3a99a08 | Vassilios Karakoidas | logging.debug("Processing msg: %s" % (msg,))
|
77 | 02feca11 | Vassilios Karakoidas | backend.process_backend_msg(msg["jobId"], msg["operation"], msg["status"], msg["logmsg"]) |
78 | d08a5f6f | Vangelis Koukis | vm.save() |
79 | d08a5f6f | Vangelis Koukis | logging.debug("Done processing msg for vm %s." % (msg["instance"])) |
80 | d08a5f6f | Vangelis Koukis | |
81 | d08a5f6f | Vangelis Koukis | except KeyError: |
82 | d08a5f6f | Vangelis Koukis | logging.error("Malformed incoming JSON, missing attributes: " + data)
|
83 | d08a5f6f | Vangelis Koukis | except VirtualMachine.InvalidBackendIdError:
|
84 | e3a99a08 | Vassilios Karakoidas | logging.debug("Ignoring msg for unknown instance %s." % (msg["instance"],)) |
85 | d08a5f6f | Vangelis Koukis | except VirtualMachine.DoesNotExist:
|
86 | d08a5f6f | Vangelis Koukis | logging.error("VM for instance %s with id %d not found in DB." % (msg["instance"], vmid)) |
87 | d08a5f6f | Vangelis Koukis | except Exception as e: |
88 | d08a5f6f | Vangelis Koukis | logging.error("Unexpected error:\n" + "".join(traceback.format_exception(*sys.exc_info()))) |
89 | d08a5f6f | Vangelis Koukis | continue
|
90 | d08a5f6f | Vangelis Koukis | |
91 | 21f59a5d | Vangelis Koukis | def main(): |
92 | 5db87ed5 | Vangelis Koukis | # Create an inproc PUB socket, for inter-thread communication
|
93 | 21f59a5d | Vangelis Koukis | zmqc = zmq.Context() |
94 | 5db87ed5 | Vangelis Koukis | inproc = zmqc.socket(zmq.PUB) |
95 | 5db87ed5 | Vangelis Koukis | inproc.bind("inproc://threads")
|
96 | 5db87ed5 | Vangelis Koukis | |
97 | c58091a6 | Vangelis Koukis | #
|
98 | 5db87ed5 | Vangelis Koukis | # Create a SUB socket, connect to ganeti-0mqd and the inproc PUB socket
|
99 | c58091a6 | Vangelis Koukis | #
|
100 | 21f59a5d | Vangelis Koukis | subscriber = zmqc.socket(zmq.SUB) |
101 | c58091a6 | Vangelis Koukis | |
102 | c58091a6 | Vangelis Koukis | # Combine the hostname, username and a constant string to get
|
103 | c58091a6 | Vangelis Koukis | # a hopefully unique identity for this 0mq peer.
|
104 | c58091a6 | Vangelis Koukis | # Reusing zmq.IDENTITY for two distinct peers triggers this 0mq bug:
|
105 | c58091a6 | Vangelis Koukis | # https://github.com/zeromq/zeromq2/issues/30
|
106 | c58091a6 | Vangelis Koukis | subscriber.setsockopt(zmq.IDENTITY, platform.node() + getpass.getuser() + "snf-db-controller")
|
107 | 21f59a5d | Vangelis Koukis | subscriber.setsockopt(zmq.SUBSCRIBE, "")
|
108 | 21f59a5d | Vangelis Koukis | subscriber.connect(GANETI_ZMQ_PUBLISHER) |
109 | 5db87ed5 | Vangelis Koukis | subscriber.connect("inproc://threads")
|
110 | 21f59a5d | Vangelis Koukis | |
111 | 5db87ed5 | Vangelis Koukis | # Use a separate thread to process incoming messages,
|
112 | 5db87ed5 | Vangelis Koukis | # needed because the Python runtime interacts badly with 0mq's blocking semantics.
|
113 | 5db87ed5 | Vangelis Koukis | zmqt = StoppableThread(target = zmq_sub_thread, args = (subscriber,)) |
114 | 21f59a5d | Vangelis Koukis | zmqt.start() |
115 | 21f59a5d | Vangelis Koukis | |
116 | 21f59a5d | Vangelis Koukis | try:
|
117 | 5db87ed5 | Vangelis Koukis | logging.info("in main thread.");
|
118 | 21f59a5d | Vangelis Koukis | while True: |
119 | 5db87ed5 | Vangelis Koukis | logging.info("When I grow up, I'll be syncing with Ganeti at this point.")
|
120 | 5db87ed5 | Vangelis Koukis | time.sleep(600)
|
121 | 5db87ed5 | Vangelis Koukis | except:
|
122 | 5db87ed5 | Vangelis Koukis | logging.error("Caught exception:\n" + "".join(traceback.format_exception(*sys.exc_info()))) |
123 | 21f59a5d | Vangelis Koukis | |
124 | 5db87ed5 | Vangelis Koukis | #
|
125 | 5db87ed5 | Vangelis Koukis | # Cleanup.
|
126 | 5db87ed5 | Vangelis Koukis | #
|
127 | 5db87ed5 | Vangelis Koukis | # Cancel the suscriber thread, wake it up, then join it.
|
128 | 5db87ed5 | Vangelis Koukis | zmqt.stop() |
129 | 5db87ed5 | Vangelis Koukis | inproc.send_json({"type":"null"}) |
130 | 5db87ed5 | Vangelis Koukis | zmqt.join() |
131 | 5db87ed5 | Vangelis Koukis | |
132 | 5db87ed5 | Vangelis Koukis | return 1 |
133 | 5db87ed5 | Vangelis Koukis | |
134 | d08a5f6f | Vangelis Koukis | if __name__ == "__main__": |
135 | d08a5f6f | Vangelis Koukis | logging.basicConfig(level=logging.DEBUG) |
136 | d08a5f6f | Vangelis Koukis | sys.exit(main()) |
137 | d08a5f6f | Vangelis Koukis | |
138 | d08a5f6f | Vangelis Koukis | # vim: set ts=4 sts=4 sw=4 et ai : |