Revision 5d081749 db/db_controller.py
b/db/db_controller.py | ||
---|---|---|
37 | 37 |
|
38 | 38 |
logger = None |
39 | 39 |
chan = None |
40 |
debug = False |
|
41 |
clienttags = [] |
|
40 | 42 |
|
41 | 43 |
def __init__(self, debug = False, logger = None): |
42 | 44 |
self.logger = logger |
43 |
self._init_queues(debug) |
|
45 |
self.debug = debug |
|
46 |
self._init() |
|
44 | 47 |
|
45 | 48 |
def update_db(self, message): |
46 | 49 |
try: |
... | ... | |
77 | 80 |
self.logger.debug("Request to update credits") |
78 | 81 |
message.channel.basic_ack(message.delivery_tag) |
79 | 82 |
|
83 |
def dummy_proc(self, message): |
|
84 |
try: |
|
85 |
msg = json.loads(message.body) |
|
86 |
self.logger.debug("Msg to %s (%s) " % message.channel, msg) |
|
87 |
finally: |
|
88 |
message.channel.basic_ack(message.delivery_tag) |
|
89 |
|
|
80 | 90 |
def wait(self): |
81 | 91 |
while True: |
82 | 92 |
try: |
83 | 93 |
self.chan.wait() |
84 | 94 |
except SystemExit: |
85 | 95 |
break |
96 |
except socket.error: |
|
97 |
self.logger.error("Server went away, reconnecting...") |
|
98 |
self._init() |
|
99 |
pass |
|
86 | 100 |
|
87 |
self.chan.basic_cancel("dbupdater")
|
|
101 |
[self.chan.basic_cancel(clienttag) for clienttag in self.clienttags]
|
|
88 | 102 |
self.chan.close() |
89 | 103 |
self.chan.connection.close() |
90 | 104 |
|
91 |
def _declare_queues(self): |
|
105 |
def _init(self): |
|
106 |
self._open_channel() |
|
92 | 107 |
|
93 | 108 |
for exchange in settings.EXCHANGES: |
94 |
self.chan.exchange_declare(exchange=exchange, type="direct", durable=True, auto_delete=False)
|
|
109 |
self.chan.exchange_declare(exchange=exchange, type="topic", durable=True, auto_delete=False)
|
|
95 | 110 |
|
96 | 111 |
for queue in settings.QUEUES: |
97 | 112 |
self.chan.queue_declare(queue=queue, durable=True, exclusive=False, auto_delete=False) |
98 | 113 |
|
99 |
def _init_queues(self,debug): |
|
100 |
self._open_channel() |
|
101 |
if debug: |
|
102 |
self._init_devel() |
|
103 |
else: |
|
104 |
self._init() |
|
114 |
bindings = None |
|
105 | 115 |
|
106 |
def _init_devel(self): |
|
107 |
self._declare_queues() |
|
108 |
self.chan.queue_bind(queue=settings.QUEUE_GANETI_EVENTS, exchange=settings.EXCHANGE_GANETI, routing_key="event.*") |
|
109 |
self.chan.basic_consume(queue="events", callback=self.update_db, consumer_tag="dbupdater") |
|
116 |
if self.debug: |
|
117 |
#Special queue handling, should not appear in production |
|
118 |
self.chan.queue_declare(queue=settings.QUEUE_DEBUG, durable=True, exclusive=False, auto_delete=False) |
|
119 |
bindings = settings.BINDINGS_DEBUG |
|
120 |
else: |
|
121 |
bindings = settings.BINDINGS |
|
110 | 122 |
|
111 |
def _init(self): |
|
112 |
self._declare_queues() |
|
113 |
self.chan.queue_bind(queue=settings.QUEUE_GANETI_EVENTS, exchange=settings.EXCHANGE_GANETI, routing_key="event.*") |
|
114 |
self.chan.basic_consume(queue="events", callback=self.update_db, consumer_tag="dbupdater") |
|
123 |
for binding in bindings: |
|
124 |
self.chan.queue_bind(queue=binding[0], exchange=binding[1], routing_key=binding[2]) |
|
125 |
tag = self.chan.basic_consume(queue=binding[0], callback=binding[3]) |
|
126 |
self.logger.debug("Binding %s on queue %s to %s" % (binding[2], binding[0], binding[3])) |
|
127 |
self.clienttags.append(tag) |
|
115 | 128 |
|
116 | 129 |
def _open_channel(self): |
117 | 130 |
conn = None |
... | ... | |
149 | 162 |
handler.setFormatter(formatter) |
150 | 163 |
logger.addHandler(handler) |
151 | 164 |
|
152 |
d = Dispatcher(debug = True, logger = logger)
|
|
165 |
d = Dispatcher(debug = opts.debug, logger = logger)
|
|
153 | 166 |
|
154 | 167 |
d.wait() |
155 | 168 |
|
... | ... | |
157 | 170 |
from optparse import OptionParser |
158 | 171 |
|
159 | 172 |
parser = OptionParser() |
160 |
parser.add_option("-d", "--debug", action="store_true", dest="debug",
|
|
173 |
parser.add_option("-d", "--debug", action="store_false", default=False, dest="debug",
|
|
161 | 174 |
help="Enable debug mode") |
162 | 175 |
parser.add_option("-l", "--log", dest="log_file", |
163 | 176 |
default=settings.DISPATCHER_LOG_FILE, |
Also available in: Unified diff