Revision a57c9013 kalamari.py
b/kalamari.py | ||
---|---|---|
53 | 53 |
__author__ = 'Georgios Gousios <gousiosg@gmail.com>' |
54 | 54 |
|
55 | 55 |
|
56 |
RETRY_ATTEMPTS = 3 |
|
57 |
|
|
56 | 58 |
log = logging.getLogger("kalamari") |
57 | 59 |
log.setLevel(logging.DEBUG) |
58 | 60 |
ch = logging.StreamHandler() |
... | ... | |
67 | 69 |
chan = None |
68 | 70 |
conn = None |
69 | 71 |
closing = False |
72 |
amqp_nodes = None |
|
73 |
current_amqp_node = None |
|
70 | 74 |
mongo_db = None |
71 | 75 |
|
72 | 76 |
def __init__(self, opts): |
73 | 77 |
self.options = opts |
78 |
self.amqp_nodes = self.options.queue_hosts.split(",") |
|
74 | 79 |
self.connect() |
75 | 80 |
|
76 | 81 |
def connect(self): |
77 |
log.info("Attempting to connect to %s", self.options.queue_host) |
|
82 |
self.select_next_node() |
|
83 |
log.info("Attempting to connect to %s", self.current_amqp_node) |
|
78 | 84 |
credentials = pika.PlainCredentials(self.options.queue_uname, |
79 | 85 |
self.options.queue_passwd) |
80 |
params = pika.ConnectionParameters(host = self.options.queue_host,
|
|
86 |
params = pika.ConnectionParameters(host = self.current_amqp_node,
|
|
81 | 87 |
credentials = credentials, |
82 | 88 |
virtual_host= "/") |
83 |
conn = pika.SelectConnection(parameters = params, |
|
84 |
on_open_callback=self.on_connected) |
|
85 | 89 |
try: |
90 |
conn = pika.SelectConnection(parameters = params, |
|
91 |
on_open_callback=self.on_connected) |
|
86 | 92 |
conn.ioloop.start() |
87 | 93 |
except SystemExit: |
88 | 94 |
log.info("System exit caught, exiting") |
89 | 95 |
self.closing = True |
90 | 96 |
conn.close() |
97 |
except Exception, e: |
|
98 |
log.warn("Could not connect to AMQP node %s" % self.current_amqp_node) |
|
99 |
return self.connect() |
|
100 |
|
|
101 |
def select_next_node(self): |
|
102 |
if self.current_amqp_node is None: |
|
103 |
self.current_amqp_node = self.amqp_nodes[0] |
|
104 |
return |
|
105 |
|
|
106 |
if len(self.amqp_nodes) == 0: |
|
107 |
log.warn("No AMQP nodes left to connect to!") |
|
108 |
else: |
|
109 |
log.warn("Removing node %s from node list" % self.current_amqp_node) |
|
110 |
self.amqp_nodes.remove(self.current_amqp_node) |
|
111 |
self.current_amqp_node = self.amqp_nodes[0] |
|
91 | 112 |
|
92 | 113 |
def on_connected(self, connection): |
93 | 114 |
self.conn = connection |
... | ... | |
100 | 121 |
log.info("Connection to AMQP closed") |
101 | 122 |
self.conn.ioloop.stop() |
102 | 123 |
return |
103 |
|
|
124 |
|
|
104 | 125 |
log.warn("Connection closed unexpectedly, attempting reconnect") |
105 | 126 |
self.conn = None |
106 | 127 |
attemts = 0 |
107 | 128 |
while self.conn == None: |
129 |
if attemts == RETRY_ATTEMPTS: |
|
130 |
log.warn("Failed all 5 attempts to connect to %s, \ |
|
131 |
trying with remaining nodes" % self.current_amqp_node) |
|
132 |
self.select_next_node() |
|
133 |
attemts = 0 |
|
134 |
|
|
108 | 135 |
try: |
109 | 136 |
attemts += 1 |
110 | 137 |
self.connect() |
111 | 138 |
except Exception, e: |
112 |
retry = attemts * 10
|
|
113 |
log.warn("Cannot connect after %d attempts, retrying after %d" %
|
|
114 |
(attemts, retry))
|
|
139 |
retry = attemts * RETRY_ATTEMPTS
|
|
140 |
log.warn("Cannot connect to %s after %d attempts, retrying\
|
|
141 |
after %d sec" % (self.current_amqp_node, attemts, retry))
|
|
115 | 142 |
time.sleep(retry) |
116 | 143 |
|
117 | 144 |
def on_channel_open(self, channel_): |
... | ... | |
123 | 150 |
# Declare a queue |
124 | 151 |
self.chan.queue_declare(queue="log", durable=True, |
125 | 152 |
exclusive=False, auto_delete=False, |
126 |
callback=self.on_queue_declared) |
|
153 |
callback=self.on_queue_declared, |
|
154 |
arguments = {'x-ha-policy': 'all'}) |
|
127 | 155 |
|
128 | 156 |
def on_queue_declared(self, frame): |
129 | 157 |
log.info("Queue declared") |
... | ... | |
132 | 160 |
|
133 | 161 |
log.info("Binding %s(%s) to queue %s with handler %s", |
134 | 162 |
"log", self.options.queue_exchange, "log", "write_msg") |
135 |
|
|
136 | 163 |
self.chan.basic_consume(self.write_msg, queue='log') |
137 | 164 |
|
138 | 165 |
def write_msg(self, channel, method, header, body): |
... | ... | |
207 | 234 |
parser.add_argument("-b", "--queue-password", required=True, |
208 | 235 |
default="", dest="queue_passwd", |
209 | 236 |
help="Password to connect to the queue") |
210 |
parser.add_argument("-c", "--queue-host", required=True, |
|
211 |
default="127.0.0.1", dest="queue_host", |
|
212 |
help="Host running the queue")
|
|
237 |
parser.add_argument("-c", "--queue-hosts", required=True,
|
|
238 |
default="127.0.0.1", dest="queue_hosts",
|
|
239 |
help="Comma seperated list of hosts running the AMQP broker")
|
|
213 | 240 |
parser.add_argument("-e", "--queue-exchange", required=True, |
214 | 241 |
default="", dest="queue_exchange", |
215 | 242 |
help="Exchange to bind to") |
Also available in: Unified diff