Revision 23c84263 db/db_controller.py

b/db/db_controller.py
21 21

  
22 22
from amqplib import client_0_8 as amqp
23 23

  
24
import json
24
import daemon
25
from signal import signal, SIGINT, SIGTERM, SIGKILL
26

  
25 27
import logging
26
import traceback
27 28
import time
28 29
import socket
29 30

  
30
import daemon
31
from signal import signal, SIGINT, SIGTERM, SIGKILL
32

  
33
from synnefo.db.models import VirtualMachine
34
from synnefo.logic import utils, backend
31
from synnefo.logic import dispatcher_callbacks
35 32

  
36 33
class Dispatcher:
37 34

  
......
45 42
        self.debug = debug
46 43
        self._init()
47 44

  
48
    def update_db(self, message):
49
        try:
50
            msg = json.loads(message.body)
51

  
52
            if msg["type"] != "ganeti-op-status":
53
                self.logger.error("Message is of uknown type %s." % (msg["type"],))
54
                return
55

  
56
            vmid = utils.id_from_instance_name(msg["instance"])
57
            vm = VirtualMachine.objects.get(id=vmid)
58

  
59
            self.logger.debug("Processing msg: %s" % (msg,))
60
            backend.process_backend_msg(vm, msg["jobId"], msg["operation"], msg["status"], msg["logmsg"])
61
            self.logger.debug("Done processing msg for vm %s." % (msg["instance"]))
62

  
63
        except KeyError:
64
            self.logger.error("Malformed incoming JSON, missing attributes: " + message.body)
65
        except VirtualMachine.InvalidBackendIdError:
66
            self.logger.debug("Ignoring msg for unknown instance %s." % (msg["instance"],))
67
        except VirtualMachine.DoesNotExist:
68
            self.logger.error("VM for instance %s with id %d not found in DB." % (msg["instance"], vmid))
69
        except Exception as e:
70
            self.logger.error("Unexpected error:\n" + "".join(traceback.format_exception(*sys.exc_info())))
71
            return
72
        finally:
73
            message.channel.basic_ack(message.delivery_tag)
74

  
75
    def send_email(self, message):
76
        self.logger.debug("Request to send email message")
77
        message.channel.basic_ack(message.delivery_tag)
78

  
79
    def update_credits(self, message):
80
        self.logger.debug("Request to update credits")
81
        message.channel.basic_ack(message.delivery_tag)
82

  
83
    def dummy_proc(self, message):
84
        try:
85
            msg = json.loads(message.body)
86
            self.logger.debug("Msg to %s (%s) " % message.channel, msg)
87
        finally:
88
            message.channel.basic_ack(message.delivery_tag)
89

  
90 45
    def wait(self):
91 46
        while True:
92 47
            try:
......
103 58
        self.chan.connection.close()
104 59

  
105 60
    def _init(self):
106
        self._open_channel()
61
        conn = None
62
        while conn == None:
63
            self.logger.info("Attempting to connect to %s", settings.RABBIT_HOST)
64
            try:
65
                conn = amqp.Connection( host=settings.RABBIT_HOST,
66
                                    userid=settings.RABBIT_USERNAME,
67
                                    password=settings.RABBIT_PASSWORD,
68
                                    virtual_host=settings.RABBIT_VHOST)
69
            except socket.error:
70
                time.sleep(1)
71
                pass
72

  
73
        self.logger.info("Connection succesful, opening channel")
74
        self.chan = conn.channel()
107 75

  
76
        #Declare queues and exchanges
108 77
        for exchange in settings.EXCHANGES:
109 78
            self.chan.exchange_declare(exchange=exchange, type="topic", durable=True, auto_delete=False)
110 79

  
111 80
        for queue in settings.QUEUES:
112 81
            self.chan.queue_declare(queue=queue, durable=True, exclusive=False, auto_delete=False)
113 82

  
114
        bindings = None
83
        bindings = settings.BINDINGS
115 84

  
116 85
        if self.debug:
117 86
            #Special queue handling, should not appear in production
118 87
            self.chan.queue_declare(queue=settings.QUEUE_DEBUG, durable=True, exclusive=False, auto_delete=False)
119
            bindings = settings.BINDINGS_DEBUG
120
        else:
121
            bindings = settings.BINDINGS
88
            bindings += settings.BINDINGS_DEBUG
122 89

  
90
        #Bind queues to handler methods
123 91
        for binding in bindings:
124
            self.chan.queue_bind(queue=binding[0], exchange=binding[1], routing_key=binding[2])
125
            tag = self.chan.basic_consume(queue=binding[0], callback=binding[3])
126
            self.logger.debug("Binding %s on queue %s to %s" % (binding[2], binding[0], binding[3]))
127
            self.clienttags.append(tag)
128

  
129
    def _open_channel(self):
130
        conn = None
131
        while conn == None:
132
            self.logger.info("Attempting to connect to %s", settings.RABBIT_HOST)
133 92
            try:
134
                conn = amqp.Connection( host=settings.RABBIT_HOST,
135
                                    userid=settings.RABBIT_USERNAME,
136
                                    password=settings.RABBIT_PASSWORD,
137
                                    virtual_host=settings.RABBIT_VHOST)
138
            except socket.error:
139
                time.sleep(1)
140
                pass
93
                cb = getattr(dispatcher_callbacks, binding[3])
94
            except AttributeError:
95
                self.logger.error("Cannot find callback %s" % binding[3])
141 96

  
142
        self.logger.info("Connection succesful, opening channel")
143
        self.chan = conn.channel()
97
            self.chan.queue_bind(queue=binding[0], exchange=binding[1], routing_key=binding[2])
98
            tag = self.chan.basic_consume(queue=binding[0], callback=cb)
99
            self.logger.debug("Binding %s(%s) to queue %s with handler %s" %
100
                              (binding[1], binding[2], binding[0], binding[3]))
101
            self.clienttags.append(tag)
144 102

  
145 103
def exit_handler(signum, frame):
146 104
    global handler_logger
......
149 107
    raise SystemExit
150 108

  
151 109
def child(cmdline):
110
    global logger
152 111
    #Cmd line argument parsing
153 112
    (opts, args) = parse_arguments(cmdline)
154 113

  
......
170 129
    from optparse import OptionParser
171 130

  
172 131
    parser = OptionParser()
173
    parser.add_option("-d", "--debug", action="store_false", default=False, dest="debug",
132
    parser.add_option("-d", "--debug", action="store_true", default=False, dest="debug",
174 133
            help="Enable debug mode")
175 134
    parser.add_option("-l", "--log", dest="log_file",
176 135
            default=settings.DISPATCHER_LOG_FILE,
......
212 171
            print e.amqp_reply_code, " ", e.amqp_reply_text
213 172

  
214 173
def main():
215
    global logger
216 174
    (opts, args) = parse_arguments(sys.argv[1:])
217 175

  
218 176
    if opts.cleanup_queues:

Also available in: Unified diff