Revision b1bb9251

b/snf-common/synnefo/lib/amqp_puka.py
66 66
            return func(self, *args, **kwargs)
67 67
        except (socket_error, spec_exceptions.ConnectionForced) as e:
68 68
            logger.error('Connection Closed while in %s: %s', func.__name__, e)
69
            self.consume_promises = []
70 69
            self.connect()
71 70

  
72 71
    return wrapper
......
140 139

  
141 140
        logger.info('Creating channel')
142 141

  
142
        # Clear consume_promises each time connecting, since they are related
143
        # to the connection object
144
        self.consume_promises = []
145

  
143 146
        if self.unacked:
144 147
            self._resend_unacked_messages()
145 148

  
......
156 159
            for exchange, type in exchanges:
157 160
                self.exchange_declare(exchange, type)
158 161

  
162
    @reconnect_decorator
163
    def reconnect(self):
164
        self.close()
165
        self.connect()
166

  
159 167
    def exchange_declare(self, exchange, type='direct'):
160 168
        """Declare an exchange
161 169
        @type exchange_name: string
......
321 329

  
322 330
        """
323 331
        if promise is not None:
324
            self.client.wait(promise, timeout)
332
            return self.client.wait(promise, timeout)
325 333
        else:
326
            self.client.wait(self.consume_promises)
334
            return self.client.wait(self.consume_promises, timeout)
327 335

  
328 336
    @reconnect_decorator
329 337
    def basic_get(self, queue):
......
352 360

  
353 361
    def close(self):
354 362
        """Check that messages have been send and close the connection."""
363
        logger.debug("Closing connection to %s", self.client.host)
355 364
        try:
356 365
            if self.confirms:
357 366
                self.get_confirms()
b/snf-cyclades-app/synnefo/logic/dispatcher.py
82 82

  
83 83
    def wait(self):
84 84
        log.info("Waiting for messages..")
85
        timeout = 600
85 86
        while True:
86 87
            try:
87 88
                # Close the Django DB connection before processing
......
90 91
                # the dispatcher to recover from broken connections
91 92
                # gracefully.
92 93
                close_connection()
93
                self.client.basic_wait()
94
                msg = self.client.basic_wait(timeout=timeout)
95
                if not msg:
96
                    log.warning("Idle connection for %d seconds. Will connect"
97
                                " to a different host. Verify that"
98
                                " snf-ganeti-eventd is running!!", timeout)
99
                    self.client.reconnect()
94 100
            except SystemExit:
95 101
                break
96 102
            except Exception as e:

Also available in: Unified diff