Revision f30730c0 db/db_controller.py
b/db/db_controller.py | ||
---|---|---|
38 | 38 |
logger = None |
39 | 39 |
chan = None |
40 | 40 |
|
41 |
def __init__(self, debug, logger):
|
|
41 |
def __init__(self, debug = False, logger = None):
|
|
42 | 42 |
self.logger = logger |
43 | 43 |
self._init_queues(debug) |
44 | 44 |
|
... | ... | |
89 | 89 |
self.chan.connection.close() |
90 | 90 |
|
91 | 91 |
def _declare_queues(self): |
92 |
self.chan.exchange_declare(exchange=settings.EXCHANGE_GANETI, type="direct", durable=True, auto_delete=False) |
|
93 |
self.chan.exchange_declare(exchange=settings.EXCHANGE_CRON, type="topic", durable=True, auto_delete=False) |
|
94 |
self.chan.exchange_declare(exchange=settings.EXCHANGE_API, type="topic", durable=True, auto_delete=False) |
|
95 | 92 |
|
96 |
self.chan.queue_declare(queue=settings.QUEUE_GANETI_EVENTS, durable=True, exclusive=False, auto_delete=False) |
|
97 |
self.chan.queue_declare(queue=settings.QUEUE_CRON_CREDITS, durable=True, exclusive=False, auto_delete=False) |
|
98 |
self.chan.queue_declare(queue=settings.QUEUE_API_EMAIL, durable=True, exclusive=False, auto_delete=False) |
|
99 |
self.chan.queue_declare(queue=settings.QUEUE_CRON_EMAIL, durable=True, exclusive=False, auto_delete=False) |
|
93 |
for exchange in settings.EXCHANGES: |
|
94 |
self.chan.exchange_declare(exchange=exchange, type="direct", durable=True, auto_delete=False) |
|
95 |
|
|
96 |
for queue in settings.QUEUES: |
|
97 |
self.chan.queue_declare(queue=queue, durable=True, exclusive=False, auto_delete=False) |
|
100 | 98 |
|
101 | 99 |
def _init_queues(self,debug): |
102 | 100 |
self._open_channel() |
... | ... | |
166 | 164 |
metavar="FILE", |
167 | 165 |
help="Write log to FILE instead of %s" % |
168 | 166 |
settings.DISPATCHER_LOG_FILE) |
169 |
|
|
167 |
parser.add_option("-c", "--cleanup-queues", action="store_true", default=False, dest="cleanup_queues", |
|
168 |
help="Remove from RabbitMQ all queues declared in settings.py (DANGEROUS!)") |
|
169 |
|
|
170 | 170 |
return parser.parse_args(args) |
171 | 171 |
|
172 |
def cleanup_queues() : |
|
173 |
|
|
174 |
conn = amqp.Connection( host=settings.RABBIT_HOST, |
|
175 |
userid=settings.RABBIT_USERNAME, |
|
176 |
password=settings.RABBIT_PASSWORD, |
|
177 |
virtual_host=settings.RABBIT_VHOST) |
|
178 |
chan = conn.channel() |
|
179 |
|
|
180 |
print "Queues to be deleted: ", settings.QUEUES |
|
181 |
print "Exchnages to be deleted: ", settings.EXCHANGES |
|
182 |
ans = raw_input("Are you sure (N/y):") |
|
183 |
|
|
184 |
if not ans: |
|
185 |
return |
|
186 |
if ans not in ['Y', 'y']: |
|
187 |
return |
|
188 |
|
|
189 |
for exchange in settings.EXCHANGES: |
|
190 |
try: |
|
191 |
chan.exchange_delete(exchange=exchange) |
|
192 |
except amqp.exceptions.AMQPChannelException as e: |
|
193 |
print e.amqp_reply_code, " ", e.amqp_reply_text |
|
194 |
|
|
195 |
for queue in settings.QUEUES: |
|
196 |
try: |
|
197 |
chan.queue_delete(queue=queue) |
|
198 |
except amqp.exceptions.AMQPChannelException as e: |
|
199 |
print e.amqp_reply_code, " ", e.amqp_reply_text |
|
200 |
|
|
172 | 201 |
def main(): |
173 | 202 |
global logger |
174 | 203 |
(opts, args) = parse_arguments(sys.argv[1:]) |
175 | 204 |
|
205 |
if opts.cleanup_queues: |
|
206 |
cleanup_queues() |
|
207 |
return |
|
208 |
|
|
176 | 209 |
#newpid = os.fork() |
177 | 210 |
#if newpid == 0: |
178 | 211 |
child(sys.argv[1:]) |
Also available in: Unified diff