Revision a57c9013 kalamari.py

b/kalamari.py
53 53
__author__ = 'Georgios Gousios <gousiosg@gmail.com>'
54 54

  
55 55

  
56
RETRY_ATTEMPTS = 3
57

  
56 58
log = logging.getLogger("kalamari")
57 59
log.setLevel(logging.DEBUG)
58 60
ch = logging.StreamHandler()
......
67 69
    chan = None
68 70
    conn = None
69 71
    closing = False
72
    amqp_nodes = None
73
    current_amqp_node = None
70 74
    mongo_db = None
71 75

  
72 76
    def __init__(self, opts):
73 77
        self.options = opts
78
        self.amqp_nodes = self.options.queue_hosts.split(",")
74 79
        self.connect()
75 80

  
76 81
    def connect(self):
77
        log.info("Attempting to connect to %s", self.options.queue_host)
82
        self.select_next_node()
83
        log.info("Attempting to connect to %s", self.current_amqp_node)
78 84
        credentials = pika.PlainCredentials(self.options.queue_uname,
79 85
                                            self.options.queue_passwd)
80
        params = pika.ConnectionParameters(host = self.options.queue_host,
86
        params = pika.ConnectionParameters(host = self.current_amqp_node,
81 87
                                           credentials = credentials,
82 88
                                           virtual_host= "/")
83
        conn = pika.SelectConnection(parameters = params,
84
                                     on_open_callback=self.on_connected)
85 89
        try:
90
            conn = pika.SelectConnection(parameters = params,
91
                                         on_open_callback=self.on_connected)
86 92
            conn.ioloop.start()
87 93
        except SystemExit:
88 94
            log.info("System exit caught, exiting")
89 95
            self.closing = True
90 96
            conn.close()
97
        except Exception, e:
98
            log.warn("Could not connect to AMQP node %s" % self.current_amqp_node)
99
            return self.connect()
100

  
101
    def select_next_node(self):
102
        if self.current_amqp_node is None:
103
            self.current_amqp_node = self.amqp_nodes[0]
104
            return
105

  
106
        if len(self.amqp_nodes) == 0:
107
            log.warn("No AMQP nodes left to connect to!")
108
        else:
109
            log.warn("Removing node %s from node list" % self.current_amqp_node)
110
            self.amqp_nodes.remove(self.current_amqp_node)
111
            self.current_amqp_node = self.amqp_nodes[0]
91 112

  
92 113
    def on_connected(self, connection):
93 114
        self.conn = connection
......
100 121
            log.info("Connection to AMQP closed")
101 122
            self.conn.ioloop.stop()
102 123
            return
103
        
124

  
104 125
        log.warn("Connection closed unexpectedly, attempting reconnect")
105 126
        self.conn = None
106 127
        attemts = 0
107 128
        while self.conn == None:
129
            if attemts == RETRY_ATTEMPTS:
130
                log.warn("Failed all 5 attempts to connect to %s, \
131
                          trying with remaining nodes" % self.current_amqp_node)
132
                self.select_next_node()
133
                attemts = 0
134

  
108 135
            try:
109 136
                attemts += 1
110 137
                self.connect()
111 138
            except Exception, e:
112
                retry = attemts * 10
113
                log.warn("Cannot connect after %d attempts, retrying after %d" %
114
                         (attemts, retry))
139
                retry = attemts * RETRY_ATTEMPTS
140
                log.warn("Cannot connect to %s after %d attempts, retrying\
141
                after %d sec" % (self.current_amqp_node, attemts, retry))
115 142
                time.sleep(retry)
116 143

  
117 144
    def on_channel_open(self, channel_):
......
123 150
        # Declare a queue
124 151
        self.chan.queue_declare(queue="log", durable=True,
125 152
                                exclusive=False, auto_delete=False,
126
                                callback=self.on_queue_declared)
153
                                callback=self.on_queue_declared,
154
                                arguments = {'x-ha-policy': 'all'})
127 155

  
128 156
    def on_queue_declared(self, frame):
129 157
        log.info("Queue declared")
......
132 160

  
133 161
        log.info("Binding %s(%s) to queue %s with handler %s",
134 162
                  "log", self.options.queue_exchange, "log", "write_msg")
135

  
136 163
        self.chan.basic_consume(self.write_msg, queue='log')
137 164

  
138 165
    def write_msg(self, channel, method, header, body):
......
207 234
    parser.add_argument("-b", "--queue-password", required=True,
208 235
                      default="", dest="queue_passwd", 
209 236
                      help="Password to connect to the queue")
210
    parser.add_argument("-c", "--queue-host", required=True,
211
                      default="127.0.0.1", dest="queue_host",
212
                      help="Host running the queue")
237
    parser.add_argument("-c", "--queue-hosts", required=True,
238
                      default="127.0.0.1", dest="queue_hosts",
239
                      help="Comma seperated list of hosts running the AMQP broker")
213 240
    parser.add_argument("-e", "--queue-exchange", required=True,
214 241
                      default="", dest="queue_exchange",
215 242
                      help="Exchange to bind to")

Also available in: Unified diff