38 |
38 |
logger = None
|
39 |
39 |
chan = None
|
40 |
40 |
|
41 |
|
def __init__(self, debug, logger):
|
|
41 |
def __init__(self, debug = False, logger = None):
|
42 |
42 |
self.logger = logger
|
43 |
43 |
self._init_queues(debug)
|
44 |
44 |
|
... | ... | |
89 |
89 |
self.chan.connection.close()
|
90 |
90 |
|
91 |
91 |
def _declare_queues(self):
|
92 |
|
self.chan.exchange_declare(exchange=settings.EXCHANGE_GANETI, type="direct", durable=True, auto_delete=False)
|
93 |
|
self.chan.exchange_declare(exchange=settings.EXCHANGE_CRON, type="topic", durable=True, auto_delete=False)
|
94 |
|
self.chan.exchange_declare(exchange=settings.EXCHANGE_API, type="topic", durable=True, auto_delete=False)
|
95 |
92 |
|
96 |
|
self.chan.queue_declare(queue=settings.QUEUE_GANETI_EVENTS, durable=True, exclusive=False, auto_delete=False)
|
97 |
|
self.chan.queue_declare(queue=settings.QUEUE_CRON_CREDITS, durable=True, exclusive=False, auto_delete=False)
|
98 |
|
self.chan.queue_declare(queue=settings.QUEUE_API_EMAIL, durable=True, exclusive=False, auto_delete=False)
|
99 |
|
self.chan.queue_declare(queue=settings.QUEUE_CRON_EMAIL, durable=True, exclusive=False, auto_delete=False)
|
|
93 |
for exchange in settings.EXCHANGES:
|
|
94 |
self.chan.exchange_declare(exchange=exchange, type="direct", durable=True, auto_delete=False)
|
|
95 |
|
|
96 |
for queue in settings.QUEUES:
|
|
97 |
self.chan.queue_declare(queue=queue, durable=True, exclusive=False, auto_delete=False)
|
100 |
98 |
|
101 |
99 |
def _init_queues(self,debug):
|
102 |
100 |
self._open_channel()
|
... | ... | |
166 |
164 |
metavar="FILE",
|
167 |
165 |
help="Write log to FILE instead of %s" %
|
168 |
166 |
settings.DISPATCHER_LOG_FILE)
|
169 |
|
|
|
167 |
parser.add_option("-c", "--cleanup-queues", action="store_true", default=False, dest="cleanup_queues",
|
|
168 |
help="Remove from RabbitMQ all queues declared in settings.py (DANGEROUS!)")
|
|
169 |
|
170 |
170 |
return parser.parse_args(args)
|
171 |
171 |
|
|
172 |
def cleanup_queues() :
|
|
173 |
|
|
174 |
conn = amqp.Connection( host=settings.RABBIT_HOST,
|
|
175 |
userid=settings.RABBIT_USERNAME,
|
|
176 |
password=settings.RABBIT_PASSWORD,
|
|
177 |
virtual_host=settings.RABBIT_VHOST)
|
|
178 |
chan = conn.channel()
|
|
179 |
|
|
180 |
print "Queues to be deleted: ", settings.QUEUES
|
|
181 |
print "Exchnages to be deleted: ", settings.EXCHANGES
|
|
182 |
ans = raw_input("Are you sure (N/y):")
|
|
183 |
|
|
184 |
if not ans:
|
|
185 |
return
|
|
186 |
if ans not in ['Y', 'y']:
|
|
187 |
return
|
|
188 |
|
|
189 |
for exchange in settings.EXCHANGES:
|
|
190 |
try:
|
|
191 |
chan.exchange_delete(exchange=exchange)
|
|
192 |
except amqp.exceptions.AMQPChannelException as e:
|
|
193 |
print e.amqp_reply_code, " ", e.amqp_reply_text
|
|
194 |
|
|
195 |
for queue in settings.QUEUES:
|
|
196 |
try:
|
|
197 |
chan.queue_delete(queue=queue)
|
|
198 |
except amqp.exceptions.AMQPChannelException as e:
|
|
199 |
print e.amqp_reply_code, " ", e.amqp_reply_text
|
|
200 |
|
172 |
201 |
def main():
|
173 |
202 |
global logger
|
174 |
203 |
(opts, args) = parse_arguments(sys.argv[1:])
|
175 |
204 |
|
|
205 |
if opts.cleanup_queues:
|
|
206 |
cleanup_queues()
|
|
207 |
return
|
|
208 |
|
176 |
209 |
#newpid = os.fork()
|
177 |
210 |
#if newpid == 0:
|
178 |
211 |
child(sys.argv[1:])
|